Developers: Adding a Unique ID Field to a Tuple's Schema

Home
Documentation
Library
Sample Code and Applications
FAQs
Articles
Community
Training
Download Center
Contact DevZone

Printer Friendly

Library Articles

Adding a Unique ID Field to a Tuple's Schema

Authors: Simon Keen, Dr. John Lifter
StreamBase Systems
23-March-2007

Topics:

Introduction 

There are many cases within a StreamBase application when it is necessary to uniquely identify a tuple. For example, there might be a need for an identifier that will be passed to an application outside of StreamBase, or the identifier may be needed to utilize the functionality inherent in a gather, merge, join, or aggregate EventFlow operator or their equivalent StreamSQL statements.

There are a number of approaches that can be used to add a unique ID field to a tuple's schema, including simple, custom, or aggregate functions, dynamic variables, and query tables. Some of these techniques are better implemented as a collection of EventFlow operators, while others are better implemented as StreamSQL statements.

Whichever approach you choose, you want your implementation to be reusable and extensible. That is, you want the option of implementing this functionality as a module that can be used in many different applications, hopefully without modification, and you want to be able to generate ID's that are unique within an application even if there are multiple points in the application where ID's are added to a tuple.

The examples discussed in this article were developed with StreamBase 3.7.1.

Test Harness 

As you evaluate each approach, you want to convice yourself that your goals of reusability and extensibility have been achieved. This article will discuss both EventFlow and StreamSQL implementations of each approach. Each implementation will be developed as an independent module and then one, or more, instances of the module will be used within another StreamBase application. To illustrate reusability, the schemas of the enclosed module and the enclosing application will differ.

Using a Function to Generate a Unique ID 

Existing simple and aggregate functions, as well as custom simple functions that you may write, can be used to generate a unique ID value. The following sections give details of these approaches.

Simple Function

Perhaps the simplest approach is to use a timestamp value, converted to a double data type, as the unique ID. The current system time, obtainable through the now() function, provides an increasing value that meets the requirements of a unique ID. This function is available to modules developed using either the EventFlow or StreamSQL approaches. This approach does have one significant limitation, which is due to the resolution that StreamBase employs with timestamp data — 1-2 milliseconds are the smallest timestamp interval. Consequently, if there is a possibility that multiple tuples will arrive within the same 1-2 milliseconds, this approach will not assign unique ID values to every tuple.

To implement this approach as an EventFlow module, simply use the map operator to add a field named uniqueID with the value to_seconds(now()) to each tuple. To insure reusability, be certain to select the all imput fields with specified changes radio button on the Output Settings tab of the map operator's Properties view. Your module should include this operator, an input stream, and an output stream; the schema you assign to the input stream is unimportant as it will be overridden when the module is incorporated into another application. These configuration settings are illustrated in the following figures.

  • The EventFlow Module — simplefunction.sbapp:
  • Simple Function Based Unique ID Module

  • The Input Stream Schema:
  • Simple Function Based Unique ID Module - Input Stream Schema

  • The Map Operator Output Settings Tab:
  • Simple Function Based Unique ID Module - Map Operator Output Settings

If you run this module, you will be able to confirm that a field named uniqueID containing a unique, increasing value is added to each tuple. Integrate this module into another EventFlow application, as shown in the following figures.

  • The EventFlow Application:
  • Simple Function Based Application

  • The Input Stream Schema:
  • Simple Function Based Application - Input Stream Schema

  • The Module Reference Input Ports Tab
  • Simple Function Based Application - Module Reference Input Ports

A StreamSQL implementation of the simple function based approach is illustrated in the following StreamSQL fragments.

  • The StreamSQL Module — simplefunction.ssql:
  •     CREATE INPUT STREAM InputStream (required_butUnused_SchemaField int);
    
        CREATE OUTPUT STREAM OutputStream AS
          SELECT *, to_seconds(now()) AS uniqueID FROM InputStream;
      
  • The StreamSQL Application:
  •     CREATE INPUT STREAM in (name string(10), age int, salary double);
        CREATE OUTPUT STREAM out;
    
        APPLY MODULE "simplefunction.ssql"
          FROM InputStream = in
          INTO OutputStream = out;
      

You can, of course, reference the EventFlow module, simplefunction.sbapp, in the StreamSQL application or use the StreamSQL module, simplefunction.ssql, in the EventFlow application. This option is true for all of the other approaches discussed in this article even if not explicitly noted in each discussion.

You can also instantiate multiple instances of the module within your enclosing application, as shown in the following StreamSQL code fragment where the module may be either simplefunction.ssql or simplefunction.sbapp. Each time a tuple is passed to an instance of the module, a unique ID value is added to the emitted tuple.

    CREATE INPUT STREAM in (name string(10), age int, salary double);
    CREATE OUTPUT STREAM out;

    APPLY PARALLEL 2 MODULE "simplefunction.ssql"
      FROM InputStream = in
      INTO OutputStream = out;

Configuring an EventFlow module reference so that multiple instances are instantiated is accomplished on the Concurrency tab of the reference's Properties view. You must select both the Run this component in a separate thread and Run in parallel threads check boxes. Then in the Number of threads: text box to set the number of threads, which equates to the number of module instances instantiated.

Simple Function Based Application - Module Reference Concurrency

See the article Data Parallelism in StreamBase Applications for a discussion of the APPLY PARALLEL MODULE clause and the Concurrency tab.

Custom Simple Function

Another approach is to write a custom simple function in Java that returns either an integer or double value as the unique ID. This approach is equally suitable for StreamBase applications developed using either the EventFlow or StreamSQL paradigm. When using a double type, the approach has a large range of values, and, when deployed multiple times within the same application, this approach will not generate duplicate values.

To extend StreamBase with a Java custom simple function, you simply write a class that includes a public static method that provides the functionality you need. The following class will implement a counter that returns a value suitable for use as a unique ID; if a more complex ID is required, your Java class could return a string identifier such as a GUID or UUID.

    package com.training;

    public class Counter {
	
        private static int counter = 0;

        public static int incrementCount() {
            return ++counter;
        }
    }

You can test this custom class by simply modifying the modules described in the preceding section replacing the expression to_seconds(now()) with the expression calljava("com.training.Counter", "incrementCount").

  • The Map Operator Output Settings Tab:
  • Custom Function Based Unique ID Module - Map Operator Output Settings

  • The StreamSQL Module — customsimplefunction.ssql
  •     CREATE INPUT STREAM InputStream (required_butUnused_SchemaField int);
    	  
        CREATE OUTPUT STREAM OutputStream AS
          SELECT *, calljava("com.training.Counter","incrementCount") AS uniqueID
            FROM InputStream;
    	

You can test the extensibility of this approach with an EventFlow application or with the following StreamSQL application, which instantiates two instances of the module and distributes incoming tuples across both instances.

  • The StreamSQL Application:
  •     CREATE INPUT STREAM in (name string(10), age int, salary double);
        CREATE OUTPUT STREAM out;
    
        APPLY PARALLEL 2 BY age MODULE "customsimplefunction.ssql"
          FROM InputStream = in
          INTO OutputStream = out;
    	

Aggregate Function

The aggregate function count() provides another option for generating monotonically increasing ID values. Since this is an aggregate function, its use is limited to the EventFlow aggregate and query operators or a StreamSQL SELECT statement applied to a windowed stream, table or materialized window. This function returns an integer value, whereas both the to_seconds(now()) and custom simple function discussed in the preceding sections could return a double value. This approach is more suitable in applications developed using the EventFlow paradigm; when in a StreamSQL application it is not possible to write a fully reusable module as the names of the incoming fields are altered. This technique works perfectly well as a StreamSQL implementation if you want to incorporate its code directly in your primary StreamSQL application.

In an EventFlow module, configure an aggregate operator as illustrated in the following screen shots.

  • The EventFlow Module — aggregatefunction.sbapp:
  • Aggregate Function Based Unique ID Module

  • The Input Stream Schema:
  • Aggregate Function Based Unique ID Module - Input Stream Schema

  • The Aggregate Operator Dimensions Tab:
  • Aggregate Function Based Unique ID Module - Aggregate Operator Dimensions Tab

  • The Aggregate Operator Aggregate Functions Tab:
  • Aggregate Function Based Unique ID Module - Aggregate Operator Aggregate Functions Tab

The critical settings that enable this approach are the dimension specification, which creates a single window that never closes yet emits a tuple as each new tuple arrives (Window Size and Advanced undefined; Emit = 1), and the directive to place all field values from the most recently received tuple into the emitted tuple (Output all input fields, applying the lastval function ...).

To test this module, create an EventFlow application that uses this module as illustrated in the following screen shots.

  • The EventFlow Application:
  • Aggregate Function Based Application

  • The Input Stream Schema:
  • Aggregate Function Based Application - Input Stream Schema

  • The Module Reference Input Ports Tab:
  • Aggregate Function Based Application - Module Reference Input Ports

A StreamSQL implementation of the aggregate function based approach is illustrated in the following StreamSQL fragments.

  • The StreamSQL Module — aggregatefunction.ssql:
  •     CREATE INPUT STREAM InputStream (required_butUnused_SchemaField int);
    
        CREATE OUTPUT STREAM OutputStream AS
          SELECT lastval(*) AS _*, count() as uniqueID 
            FROM InputStream1[SIZE 2147483647 ADVANCE 2147483647 TUPLES VALID ALWAYS];
      
  • The StreamSQL Application:
  •     CREATE INPUT STREAM in (name string(10), age int, salary double);
        CREATE OUTPUT STREAM out;
    
        APPLY MODULE "aggregatefunction.ssql"
          FROM InputStream = in
          INTO OutputStream = out;
      

In the aggregatefunction.ssql module, note that each field from the incoming tuple is emitted with an altered name. The underscore character (or any other character(s) you choose) is prepended to the original field name. As a consequence of this syntax constraint, the StreamSQL implementation of the aggregate function approach is not reusable without edits to the application using this module; that is, following statements would need to refer to the modified field names rather than the original field names in the incoming tuple. The window specification ([SIZE 2147483647 ADVANCE 2147483647 TUPLES VALID ALWAYS]) sets the dimensions of the window to the maximum value of an integer data type; the window will remain open until the value returned by count() reaches its maximum value; then the window will close, another window will open, and the sequence of unique ID's will repeat.

The aggregate function based approach, either as an EventFlow or Stream SQL implementation, is NOT SUITABLE in situations where you want to use multiple instances of the module in the same application. This is because the multiple modules each create an implementation of the functionality and maintain a separate tally from the count() function.

Using a Dynamic Variable to Generate a Unique ID 

A dynamic variable is a value available to an EventFlow operator, or thoughout a StreamSQL module, which may be used in an expression. For example, a dynamic variable may be used in setting the value of a field emitted by a map operator or in setting the value of a target list entry within a StreamSQL SELECT statement. In an EventFlow module, the value of a dynamic variable is set from the value of a tuple field at an input or output stream. In a StreamSQL module, the value of a dynamic variable is set from the value of a tuple field in any stream within the module. In an EventFlow module, it is possible to use field values in the tuples emitted by an operator to reset the value of a dynamic variable associated with this operator. In a StreamSQL moudule, it is possible to use field values from tuples on a stream to reset the value of a dynamic variable that was used in creating the stream.

To implement this approach as an EventFlow module, create the application diagram shown in the following figure. The critical steps (shown in the following screen shots) are the declaration of the dynamic variable (on the Dynamic Variables tab of the Map1 operator's Properties view) and the declaration of the schema associated with tuples emitted by the Map2 operator. Since the value of the dynamic variable is reset from a value available on an output stream, you must explicitly define the schema assigned to this stream before configuration of the module can be successfully completed (note that this arc is colored blue, rather than black, indicating that its schema has been explicitly defined). Tuples emitted on OutputStream2 are only used to reset the value of the dynamic variable; tuples emitted on OutputStream1 include fields from the tuples received on the input stream and the added unique ID value.

  • The EventFlow Module — dynamicvariable.sbapp:
  • Dynamic Variable Based Unique ID Module

  • The Input Stream Schema:
  • Dynamic Variable Based Unique ID Module - Input Stream Schema

  • The Map1 Operator Dynamic Variables Tab:
  • Dynamic Variable Based Unique ID Module - Map1 Operator Dynamic Variables Tab

  • The Map1 Operator Output Settings Tab:
  • Dynamic Variable Based Unique ID Module - Map1 Operator Output Settings Tab

  • The Map2 Operator Output Settings Tab:
  • Dynamic Variable Based Unique ID Module - Map1 Operator Output Settings Tab

  • The Map2 Operator Output Arc:
  • Dynamic Variable Based Unique ID Module - Map2 Operator Output Arc

To test this module, create an EventFlow application that uses this module as illustrated in the following screen shots.

  • The EventFlow Application:
  • Dynamic Variable Based Application

  • The Input Stream Schema:
  • Dynamic Variable Based Application - Input Stream Schema

  • The Module Reference Input Ports Tab:
  • Dynamic Variable Based Application - Module Reference Input Ports

A StreamSQL implementation of the dynamic variable based approach is illustrated in the following StreamSQL fragments.

  • The StreamSQL Module — dynamicvariable.ssql:
  •     CREATE INPUT STREAM InputStream (required_butUnused_SchemaField int);
        CREATE STREAM counterStream (count int);
    
        DECLARE c int DEFAULT 0
          UPDATE FROM (SELECT count FROM counterStream);
    
        SELECT *, c+1 AS count FROM InputStream =>
          CREATE OUTPUT STREAM OutputStream;
    
        SELECT count FROM OutputStream INTO counterStream;
      
  • The StreamSQL Application:
  •     CREATE INPUT STREAM in (name string(10), age int, salary double);
        CREATE OUTPUT STREAM out;
    
        APPLY MODULE "dynamicvariable.ssql"
          FROM InputStream = in
          INTO OutputStream = out;
      

Since a dynamic variable in a StreamSQL module may be set from a field value of a tuple on any stream, the same stream (OutputStream) can be used to reset the dynamic variable and to emit tuples from the module. This makes use of the StreamSQL module easier than use of the EventFlow module as there is no requirment to capture and ignore output from a stream that has no importance to the enclosing application.

The dynamic variable based approach, either as an EventFlow or Stream SQL implementation, is NOT SUITABLE in situations where you want to use multiple instances of the module in the same application. This is because the multiple modules each create an instance of the dynamic variable, which leads to duplication of the unique ID values.

Using a Query Table to Generate a Unique ID 

A query table data construct may also be used to generate a unique ID. In this approach, the table is incremented through a write/update operation and the table value returned as the new ID value. This approach has the following characteristics:

  • The initial ID can start at any value.
  • The ID can be incremented by any amount.
  • The ID type may be integer or double.
  • The current ID value may be written to a data store (a file created and managed by the StreamBase runtime engine) and retrieved on application restart.
  • The response time may be slower than the other approaches.

This approach is best implemented as an EventFlow module, where the critical steps involve how the query operator is configured to update the value stored in the query table. When developed as a StreamSQL module, easy reuse is not possible as there is no application independent way of removing fields originating in the query table from the emitted tuple.

To implement as an EventFlow module, create the application diagram shown in the following figure.

  • The EventFlow Module — querytable.sbapp:
  • Query Table Based Unique ID Module

  • The Input Stream Schema:
  • Query Table Based Unique ID Module - Input Stream Schema

  • The Query Table Edit Schema Tab:
  • Query Table Based Unique ID Module - Query Table Edit Schema Tab

  • The Query Table Primary Index Tab:
  • Query Table Based Unique ID Module - Query Table Primary Index Tab

  • The Query Operator Query Settings Tab:
  • Query Table Based Unique ID Module - Query Operator Query Settings Tab

  • The Query Operator Operation Settings Tab:
  • Query Table Based Unique ID Module - Query Operator Operation Settings Tab

  • The Query Operator Output Settings Tab:
  • Query Table Based Unique ID Module - Query Operator Output Settings Tab

  • The Map Operator Output Settings Tab:
  • Query Table Based Unique ID Module - Map Operator Output Settings Tab

To test this module, create an EventFlow application that uses this module as illustrated in the following screen shots.

  • The EventFlow Application:
  • Query Table Based Application

  • The Input Stream Schema:
  • Query Table Based Application - Input Stream Schema

  • The Module Reference Input Ports Tab:
  • Query Table Based Application - Module Reference Input Ports

A StreamSQL implementation of the query table based approach is illustrated in the following StreamSQL fragment.

  • The StreamSQL Module — querytable.ssql:
  •     CREATE INPUT STREAM InputStream (required_butUnused_SchemaField int);
        CREATE OUTPUT STREAM OutputStream;
        CREATE STREAM out__Query1_1;
    
        CREATE DISK TABLE QueryTable1 (
          LastId int,
          RowKey int, 
          PRIMARY KEY(RowKey) USING hash
        );
    
        INSERT INTO QueryTable1 (RowKey, LastId)
          SELECT 0 AS RowKey, 1 AS LastId
            FROM InputStream
              ON DUPLICATE KEY UPDATE LastId = LastId+1
          RETURNING *
            INTO out__Query1_1;
    
        SELECT *, LastId AS uniqueID
          FROM out__Query1_1
            INTO OutputStream;
      

However, in the StreamSQL SELECT statement (the last statement in the module), there is no facility to selectively drop tuple or table fields. That is, there is no way to pass fields from the incoming tuple but drop fields from the query table (which was what was accomplished in the configuration of the EventFlow map operator's Output Settings tab) other than to explicitly list the fields you want in the tuples emitted on the output stream. Consequently this module must be modified for each application, editing the schema of the input stream to match the schema of the incoming tuples and replacing the SELECT clause target list entry * with a listing of the tuple fields.

The query table based approach, either as an EventFlow or Stream SQL implementation, is NOT SUITABLE in situations where you want to use multiple instances of the module in the same application. This is because the multiple modules each create an instance of the query table, which leads to duplication of the unique ID values.

Related Topics 

Back to Top ^