Using the Merge Operator

This topic explains how the Merge operator works and the actions you can take on its Properties view.

Introduction

The Merge operator compares tuples on two input streams, using an expression that specifies a field to sort on. It then combines the data, emitting one ordered output stream. The two input streams must have compatible schemas (with the same fields, data types and sizes, not necessarily in the same order). Tuples on the output stream also have the same schema as the input streams. Incoming tuples are sorted in increasing order. Optionally, tuples can also be grouped by one or more input fields. Any groups that you define are ordered independently when they are merged.

The Merge operator stores arriving tuples in a buffer for each input port. It emits tuples when a new tuple's value (based on the field that was selected to merge on) is greater than or equal to the oldest tuple in the other buffer. If the group option has been selected, the tuples must also evaluate to the same group.

The Merge operator is order sensitive. For example, if you wanted to combine the trades coming from The New York Stock Exchange and the Philadelphia Stock Exchange you could use either a Union operator or a Merge operator. If you wanted to make sure that the data being combined maintained order using the Trade date you would use the Merge operator. If order was not important you could use the Union operator.

Tip

When you use the Merge operator, you should expect all of your streams to receive data. This is important because the merge operation waits for data on all input ports, in order to guarantee that the output is correctly merged. For example, if one of your streams has a very low data rate, the other streams might buffer more than you intended. In this case, consider adding a Heartbeat operator upstream from the Merge operator. The Heartbeat operator can detect when tuples do not arrive at a specified interval and emit its own tuple, causing the Merge operator to flush buffered data.

This topic describes the actions you can take on each tab of the Merge operator's Properties View, and illustrates how tuples are merged.

General Tab

Name: Every application component must have a unique name. Use this field to specify or change the component's name. The name must contain only alphabetic characters, numbers, and underscores, and no hyphens or other special characters. The first character must be alphabetic or an underscore.

Enable Error Output Port: Check this box to add an Error Port to this component. In the EventFlow canvas, the Error Port shows as a red output port, always the last port for the component. See Using Error Ports and Error Streams to learn about Error Ports.

Description: Optionally, enter a description to briefly describe the component's purpose and function. In the EventFlow canvas, you can see the description by pressing Ctrl while the component's tooltip is displayed.

Merge Settings Tab

The Merge Settings tab allows you to specify the field on which tuples should be merged. This field is used to evaluate order of the tuples on the two input streams. See Example 1: Simple Merge in this topic to trace a merge operation.

Group Options Tab

The Group Options tab allows you to specify tuple groupings based on one or more fields in the input streams. Add a row for each group and identify its input field in the Expression field. See Example 2: Merge with Grouping in this topic to trace a merge operation that uses the group option.

Dynamic Variables Tab

The Dynamic Variables tab allows you to define variables for this operator that can then be used in one of its expressions. A dynamic variable can be updated by any input stream or output stream in your application. For more information, see Using Dynamic Variables.

Concurrency Tab

Run this component in a separate thread

This option causes the server to process the component's requests concurrently with other processing in the application. You can distribute the processing of the threads automatically across multiple processors on an SMP machine.

If this is a compute-intensive component and you know that it can run without data dependencies on other components in the StreamBase application, you may be able to improve performance by enabling this option.

Caution

These features are not suitable for every application. For details, see Execution Order, Concurrency, and Parallelism. It includes important guidelines for the use of these features.

Run in parallel threads

If you checked the first option, you can also choose this option, which causes the server to run multiple instances of this component. That is, each instance runs in its own thread. At run time, tuples are dispatched to particular instances based on the Key Expression value (which must evaluate to an int).

Null Values

  • In an operation that performs sorting, any tuple with a null value in the ordering field or in a Boolean expression, will be ignored.

  • If the evaluation of a predicate results in a NullValueException error, the tuple will be dropped.

  • If this component contains a Group Options tab, any null value in a Group By expression will be grouped.

    For more information, see Using Nulls.

Example 1: Simple Merge

Consider an application that performs a simple Merge operation where the input schema contains a stock symbol field (double datatype) and a timestamp field. The operation merges on the timestamp field. That is, we want the output stream to merge the two input streams with the tuples sorted by time. Keeping in mind the earlier introduction to the Merge operation, let's trace some data flowing through the application.

Each step of this example shows a new tuple being enqueued to the Merge operator, and any output tuples that are released. The contents of the two operator buffers are shown after each tuple arrives and after any tuples are released.

  1. As we begin the trace, a tuple has already been enqueued on the second stream and is stored in the Merge buffer. Now, another tuple enters the on the second input stream. No output occurs because both inputs are on the same stream:

  2. A tuple arrives on the first stream, causing the Merge operator to evaluate the new tuple against each buffered tuple from the other stream. Because both existing tuples have a lower value based on their timestamps, Merge releases them on its output stream. The new tuple is retained in the buffer.

  3. Another arrives on the first stream. Again, only one buffer contains data, so no output occurs.

  4. A tuple arrives on the second stream. Notice that its value is between those of the existing values on the other stream. The result is that two tuples are emitted: the new one and the existing one of lower value from the other port's buffer.

  5. A tuple arrives on the first stream, with no output. Note that its value is the same as the existing tuple:

  6. A new tuple on the second stream has the same value as both tuples stored in the first port's buffer. In this case, Merge releases all three tuples of equal value to the output stream:

Example 2: Merge with Grouping

In this example we will trace the effects of the same input as in Example 1: Simple Merge, explaining how grouping changes the way Merge works. The schema and the input are the same, and we will still merge on the timestamp. But we will also group on the symbol field. That is, we want tuples to be organized by stock symbol, and sorted by timestamp within each group.

  1. As in Example 1, we begin with two tuples from the second input stream. As before, there are no tuples in the first port's buffer, so no output has occurred.

  2. When the next tuple arrives on the first stream, the same condition applies as in Example 1: the two tuples on the second port have earlier timestamps. However, no merge occurs this time, because now we want to group tuples by symbol. None of the tuples buffered on the second port matches the symbol of the incoming tuple, so the new tuple is buffered while the Merge operator waits for more input.

  3. A new tuple on the second stream meets the merge-by-group conditions: the two tuples in the first port's buffer have smaller timestamp values, and their symbol fields match the incoming tuple's symbol field. Therefore, the Merge operator releases a sorted group of tuples with that symbol on its output stream.

  4. Another tuple arrives on the second stream. Compare the result with the same step in Example 1: even though the first buffer contains a tuple with a lower value, no output occurs this time, because the symbols are different.

  5. A tuple arrives on the first stream. An existing tuple on the other port's buffer has the same symbol and a lower time value, so a merge occurs: the existing tuple is released.

  6. The last tuple, arriving on the first stream, causes no merge because the second port's buffer previously released all its tuples. Recall that in Example 1, the second port's buffer still contained a tuple with a matching symbol at this point, and a merge occurred as a result.