This topic explains how the Gather operator works and the actions you can take on its Properties view.
The Gather operator receives input from two or more streams and concatenates tuples that share the same key value. The Gather operator is often used when a tuple needs to be copied into multiple branches of an application, and then rejoined later on for further processing.
The Gather operator releases a new tuple, on a single output stream, that can be a function of any of the input tuple fields. By default, Gather buffers input tuples for each value of the key field until the value has been seen on each of the input streams; only then is the output tuple released. Matching tuples may arrive in any order (input need not be synchronized), and tuples are emitted in the order that they are fully matched. You can change the default behavior by setting a timeout interval.
Note
Duplicate keys should be avoided or used with caution, because they can have unexpected results. If two or more tuples with duplicate keys are received on the same stream before the first is emitted, the second tuple will replace the first in the buffer (unless timeout is enabled, in which the incoming tuple is ignored).
Also, keep in mind when designing Gather operations that unmatched input tuples remain buffered at run time. Gather operations with large numbers of unmatched tuples can cause the buffer size to become excessive.
This topic describes how to set Gather operator properties in the Properties View, and traces the execution of a gather operation.
Name: Every application component must have a unique name. Use this field to specify or change the component's name. The name must contain only alphabetic characters, numbers, and underscores, and no hyphens or other special characters. The first character must be alphabetic or an underscore.
Enable Error Output Port: Check this box to add an Error Port to this component. In the EventFlow canvas, the Error Port shows as a red output port, always the last port for the component. See Using Error Ports and Error Streams to learn about Error Ports.
Description: Optionally, enter a description to briefly describe the component's purpose and function. In the EventFlow canvas, you can see the description by pressing Ctrl while the component's tooltip is displayed.
In the Gather Settings tab, specify the number of streams that will participate in the gather, and the field by which to gather.
You can limit buffering by selecting the Enable timeout option. Specify a timeout interval and a field on which gathered tuples should be sorted. The field can be any data type that is ordered — which means you cannot choose a string field, for example. The timestamp data type is typically used. The timeout interval is measured in units appropriate for the specified field's data type. If you specify an int or double field, the timeout interval is measured in units. If you specify a timestamp field, the timeout interval is measured in seconds.
The timeout limits how long Gather continues to buffer tuples. Instead of waiting indefinitely until the key value has been seen on all the input streams, Gather first establishes the current time by listening for a tuple on each input port. It then waits until either the required gather conditions are satisfied, or until a timeout occurs, whichever occurs first. A timeout occurs when a tuple arrives whose time (compared to the initial time established earlier) exceeds the specified timeout period. When the time is updated this way, the Gather operator emits a tuple. The missing input tuples' fields are set to null when the output expressions are evaluated.
As noted previously, enabling timeout also changes the way duplicate tuples are handled. Without timeout, if a tuple arrives whose key field matches an existing tuple on the same stream, the new tuple replaces the existing tuple. With timeout, the opposite happens: the existing tuple remains and the new one is lost.
|
If you enable Timeout, you can also enable another option, Output arrival time of last tuple. This option adds a field called last_time to the output schema. This field is populated at runtime with the latest time at which an input tuple was received corresponding to that output tuple. The last_time field is always sorted across output tuples, so that it can reliably be passed to Aggregate operators; to ensure this sorted condition, Gather keeps a full timeout interval's worth of data buffered at all times.
This means that the timeout is not guaranteed to occur when the specified time elapses, as you might expect. Instead, the timeout occurs only when the time field on all the input ports has advanced by the specified interval, ensuring that tuples can be sorted correctly.
The Output Settings tab allows you to specify the field names and expressions the Gather operator should release. The release occurs when the Gather operator has seen a tuple with a specific key value on each of the streams.
Specify output fields using one of the two Output options:
-
Choose all input fields option to automatically pass all input fields through to the output stream.
By default, this setting adds prefixes to the names of input fields, as shown in the following screen:
Notice that a different prefix is specified for fields coming from each input stream, to guarantee unique output field names. Thus, if there are five input fields, five different prefixes are generated. You can change the prefixes, but they must be unique: an error message warns you if any two are the same.
-
Choose explicitly specified fields to specify the output fields manually.
With the explicit option selected, the Output Fields table is initially empty. Add a row for each output field you want, specifying the Output Field Name and its Expression. Alternatively, use the
Pass All
button to load any or all of the input fields into the table.
To avoid ambiguity in expressions when there are duplicate input field names in different streams, add to each field name a prefix that indicates the input stream it comes from. Use the following syntax:
input
n.input_fieldwhere
nis the Gather operator's input port number. For example, the following screen contains expressions that refer to fields in three input streams:
The Dynamic Variables tab allows you to define variables for this operator that can then be used in one of its expressions. A dynamic variable can be updated by any input stream or output stream in your application. For more information, see Using Dynamic Variables.
- Run this component in a separate thread
-
This option causes the server to process the component's requests concurrently with other processing in the application. You can distribute the processing of the threads automatically across multiple processors on an SMP machine.
If this is a compute-intensive component and you know that it can run without data dependencies on other components in the StreamBase application, you may be able to improve performance by enabling this option.
Caution
These features are not suitable for every application. For details, see Execution Order, Concurrency, and Parallelism. It includes important guidelines for the use of these features.
- Run in parallel threads
-
If you checked the first option, you can also choose this option, which causes the server to run multiple instances of this component. That is, each instance runs in its own thread. At run time, tuples are dispatched to particular instances based on the Key Expression value (which must evaluate to an int).
-
In an operation that performs sorting, any tuple with a null value in the ordering field or in a Boolean expression, will be ignored.
-
If the evaluation of a predicate results in a NullValueException error, the tuple will be dropped.
-
If this component contains a Group Options tab, any null value in a Group By expression will be grouped.
For more information, see Using Nulls.
Consider a simple application with a Gather operator that has two inputs. Both input streams receive tuples containing a stock symbol and a volume of shares. The inputs can be asynchronous: tuples may arrive at different rates on each stream. Using the stock symbol field as the key, the Gather operator sums the volume of matching stocks from the two input streams, and emits a tuple listing the stock's symbol and its summed volume. Note that the timeout option is not enabled. Let's trace a series of tuples flowing into the Gather operator in this application.
In the diagrams below, each step depicts a new tuple being enqueued to the Gather operator, and any output that results. The contents of the two operator buffers are shown after each tuple arrives and after any tuples are released.
-
A tuple enters the Gather operator on the first input port and is stored in the buffer:
-
A second tuple enters on the first port. Nothing is emitted by the Gather operator because both buffered tuples entered on the same input port, and because the symbols don't match:
-
A tuple enters on the second port. This time, the conditions for the gather operation are met: the key field is matched by tuples on both input ports. In the gather operation, the volumes of the AAPL tuples are summed, the result is emitted in an output tuple, and the matching tuples are released from the buffer.
-
This step introduces the problem that can be caused by duplicate keys on the same stream. The new tuple's key field (IBM) matches the key field of an existing tuple on the same port. As you should expect, there is no output because the match did not occur on both ports. But notice in the figure that the new tuple replaces the existing tuple. In the next step, we will see how this affects the Gather result.
-
A tuple arriving on the second port matches a tuple on the first port, and Gather releases the result. Notice that the value of the output tuple is the sum of the last two key field values (289 and 350). The earlier IBM value of 300 will not be used: it is effectively lost in this Gather operation.
To summarize this example:
-
No output is emitted until Step C, when the same key has been seen on both input ports. The output is the sum of the AAPL volumes that arrived on the Gather operator's first and second input ports.
-
Steps D and E show the potential side effect of duplicate keys on the same stream.
This example traces the execution of a Gather operator that has the timeout option set. The input schema has three fields: a stock symbol, its price, and its timestamp. The Gather operator has two input ports. Each accepts stocks with different values (this might be accomplished by using a Filter operator upstream from the Gather operator). The first input port receives Stocks with values equal to or greater than 200; the second port contains stocks worth less than 200. Like the first example, this operation gathers on the symbol field. Timeout is enabled with a timeout interval of 10 seconds, ordered by the timestamp field (this setting is shown in the Gather Settings Tab screen). Notice that the Output arrival time of last tuple option is not selected.
As in the first example, each step shows a new tuple being enqueued to the Gather operator, and any output that results. In addition, these figures have an additional timestamp field at the end.
-
A tuple enters the Gather operator on the second input port and is stored in the buffer. This arrival establishes the start time on the second port:
-
A tuple enters the Gather operator on the first input port and is stored in the buffer. Notice that this tuple arrives 11 seconds after the first tuple. Thus, the specified timeout interval has elapsed without a gather occurring:
-
The third tuple arrives on the second port, and the Gather operator emits two output tuples:
Why are two tuples emitted?
-
The input tuple on Port 1 triggers a gather because its key field (IBM) matches the key field on a tuple in the Port 2 buffer.
-
When the gather occurs, the timeout interval has already elapsed (the time values of the first and second tuples is greater than 10). This is why a second tuple is emitted with the missing gather fields set to null.
If the third input had not matched on the key field, only the timeout tuple with the null fields would have been emitted.
-
This example traces the execution of a Gather operator that has both the timeout and Output arrival time of last tuple options set. With these options, we are interested not only in the gather result, but also in the arrival time of the last tuple. Otherwise, the application in this example is identical to the one in Example 2: Gather with Timeout. Each step of this example shows a new tuple being enqueued to the Gather operator, and any output that results.
-
A tuple enters the Gather operator on the first input port and is stored in the buffer. This tuple establishes the start time on the first port:
-
A tuple enters the Gather operator on the first input port and is stored in the buffer. Notice that this tuple meets the normal criteria for a gather to occur: the symbols are matched on both input ports, and both tuples arrived within the timeout interval.
Why is no tuple emitted? As explained in Gather Settings Tab, the last_time value cannot be established until the timeout period expires, and the time is then updated on all of the Gather operator's input ports. So at this point, more input must arrive before a gather operation can occur.
-
A third tuple arrives on the second port. Notice that the elapsed time since the last gather tuple (15 seconds) exceeds the timeout interval. This establishes the elapsed time on the second port for purposes of the timeout:
-
When the fourth tuple arrives on the first port, Gather is able to establish the elapsed time since the last gather event. The operator emits a tuple with the gather result and the time value for the second tuple, which was the last tuple in the gather operation:
