Creating StreamBase C++ Clients

This topic describes how to create enqueue and dequeue C++ client applications for StreamBase applications.

Writing C++ Enqueue Clients

This section describes how to use the StreamBase C++ API to write a client application that enqueues data to a StreamBase Server.

Sample source code: streambase-directory/sample/compliance/preload.cpp

The basic procedure for enqueuing data into a StreamBase node in C++ is:

  1. Include StreamBaseClient.hpp

  2. Create an instance of the StreamBaseClient class. If needed, specify the String URI of the desired StreamBase node as an argument. For example:

    StreamBaseClient client("sb://localhost:10000/");

  3. For every tuple of data to enqueue:

    1. Retrieve a Schema object for each stream you want to enqueue. For example:

      Schema schema = client.getSchemaForStream("InputStream");

      ...where "InputStream" is the name of an input stream in your StreamBase application.

    2. Create a BufferedTuple of the given schema; in this example, the schema of the same stream as above:

      BufferedTuple tuple(schema);

    3. Set values for each of the tuple's fields. For example:

      tuple.setInt("myint",5);
      tuple.setString("mystring","hello");
      // ...
      // where "myint" and "mystring" are field names in the input stream
      // of your StreamBase application
      
    4. Enqueue the tuple onto the stream. For example:

      client.enqueue("InputStream",tuple);

Compiling a StreamBase C++ Client on UNIX

On a supported UNIX machine where StreamBase is installed, use the sb-config utility to set up the environment and define the compiler to use when compiling your program. For example:

CXX=`sb-config --cxx`
$CXX MyClient.cpp `sb-config --cflags` -c -o MyClient.o
$CXX MyClient.o `sb-config --libs` -o MyClient

Substitute the name of your client for MyClient.

Use the Makefiles in the following StreamBase samples as a guide to setting up your projects:

/opt/streambase/sample/custom-aggregate-function

/opt/streambase/sample/custom-simple-function

Compiling a StreamBase C++ Client on Windows

To build your C++ client application on Windows, you must configure Microsoft Visual Studio as described in Configuring Visual C++.

Buffering for C++ Enqueue Clients

For enqueue clients that use the StreamBase C++ Client Library API, you can enable tuple buffering and set the following parameters:

  • The buffer size (number of tuples per buffer)

  • The buffer's flush interval

You can also explicitly flush the buffer of a specified stream, or flush all buffers.

By enabling tuple buffering and experimenting with these parameters, you may be able to improve the efficiency and performance of your enqueue code. A single enqueue of a buffer containing (for example) 100 tuples should be more efficient than making 100 separate enqueue operations.

The examples in this section demonstrate how to use this feature. Here the buffer size is set to 100 tuples and the buffer flush interval is set to 1000 milliseconds (one second).

Notes: In the StreamBase client APIs, buffering is only turned on if the value for the buf_size parameter is greater than zero. (The buf_size parameter specifies the number of tuples, not a byte limit.) Also the buffer is flushed at a regular interval specified by the flush_interval parameter. If the flush_interval is not greater than zero, the buffer will only be flushed when it reaches capacity (it is filled with tuples).

There are two ways to turn on buffering in C++. They are shown in the two examples that follow.

Buffering Example 1

The first buffering method using the StreamBase C++ client API is to specify the buffering parameters in the StreamBaseClient constructor. For example:

#include "StreamBase.hpp"
#include "StreamBaseClient.hpp"
#include "Tuple.hpp"

using namespace sb;
   .
   .
   .
        // grab the URI, either from the command line or the environment
        StreamBaseURI uri;
        uri = StreamBaseURI::fromEnvironment();

        // connect to the StreamBaseServer
        int buf_size = 100;
        int flush_interval = 1000;
        StreamBaseClient client(uri,buf_size,flush_interval);

Buffering Example 2

The second buffering alternative for C++ clients is to use the enableBuffering method. For example:

#include "StreamBase.hpp"
#include "StreamBaseClient.hpp"
#include "Tuple.hpp"

using namespace sb;
   .
   .
   .
        // grab the URI, either from the environment
        StreamBaseURI uri;
        uri = StreamBaseURI::fromEnvironment();

        // connect to the StreamBaseServer
        int buf_size = 100;
        int flush_interval = 1000;
        StreamBaseClient client(uri);
                client.enableBuffering(buf_size,flush_interval);

Note: If buffering is enabled and the flush_interval is set, the buffer will be flushed periodically based on the specified interval. However, to have more control over enqueuing buffered tuples, you can also use the flushBuffer(stream_name) method of the StreamBaseClient class to enqueue the contents of a buffer immediately. Or, you can use the flushAllBuffers() method to enqueue the contents of all buffers. These methods assume that the caller has a lock on _buffers_lock.

Generally you want to "flush" a buffer when you are concerned that the tuples in the buffer may become stale (staleness is relative to the particular application). Let's say, for example, your buffer size is 1000 and your flush_interval is 0 (this value turns off periodic flushing). The buffer will automatically be flushed when it is filled. That is, when the 1000th tuple has been enqueued. If 300 tuples have just been enqueued and it is unclear when or if more tuples will arrive to fill the buffer, it would be a good idea to call flushBuffer() or flushAllBuffers().

Writing C++ Dequeue Clients

This section describes how to use the StreamBase C++ API to write a client application that dequeues data from a StreamBase Server.

Note: You can make your application dequeue a subset of the server output instead of all the output. For details, see the topic, Narrowing Dequeue Results with Filtered Subscribe.

Performance Note: Dequeue (producer) clients that are slow may eventually get disconnected from the StreamBase Server, sbd. StreamBase uses a memory parameter called Maximum pages per client connection in the IDE, and max-client-pages in the server's *.sbconf configuration file. The sbd process will disconnect clients that try to allocate more memory than the limit set by this parameter. It is designed to protect sbd from a slow or hung dequeue client. For more information, see Defining a StreamBase Server Configuration in the Administration Guide.

Sample source code: streambase-directory/sample/compliance/outputviewer.cpp

The basic procedure for dequeuing data from a StreamBase node in C++ is as follows:

  1. Include StreamBaseClient.hpp

  2. Create an instance of the StreamBaseClient class. If needed, specify the String uri of the desired StreamBase node as an argument. For example:

    StreamBaseClient client("sb://localhost:10000/");

  3. Subscribe to each stream that you want to dequeue. For example:

    client.subscribe("OutputStream");

    ...where OutputStream is the name of an output stream in your StreamBase application.

  4. Call the dequeue(String s) method on the client, which blocks until a tuple becomes available. This method returns a list of Tuples dequeued from each of the streams to which you subscribed, and whose name you set with the string argument. For example:

    string stream;
    ConstTupleList tuples = client.dequeue(stream);
    //stream is now "OutputStream"
    
  5. If the Tuple list is empty, the server (or your client from another thread) is requesting that you close the connection; you should exit the dequeue method. For example:

    if (tuples.empty()) return;
    
  6. Otherwise, you can do the following:

    1. Loop over the Tuple list. For example:

      for (size_t i = 0; i < tuples.size(); ++i) {
      
    2. Retrieve each Tuple as a ConstTuple object. For example:

      ConstTuple tuple = tuples[i];
      
    3. ...And use it to access the tuple's constituent fields. For example:

      int myint = tuple.getInt("myint");
      string mystring = tuple.getString("mystring");
           // ...
           // where "myint" and "mystring" are field names in the output stream
           // of your StreamBase application.
      
  7. If you wish to cancel the blocking dequeue call from any thread, (for example if your client is shutting down), call metclient.close();. In fact, before exiting you should always call the client.close() method to flush the client network buffers.

    Important

    Note that the StreamBase client API is not thread-safe. You cannot share a client connection across threads. For example, if you attempt to close a client from a thread other than the one in which it was created, a stack overflow error will occur. To run clients multi-threaded, you would need to establish a new client for each thread.

Compiling C++ Dequeue Clients

The procedure for compiling dequeue clients is the same as described above for enqueue clients. Remember to always call client.close() to flush the client network buffers before exiting.

Deploying C++ Clients to Non-StreamBase Windows Systems

C++ clients can be installed on systems that do not have StreamBase installed. However, you also must include the following files in the directory with your client executable, or in a directory on the PATH:

  • The basic Visual Studio VC 7.1 runtime libraries, msvcp71.dll and msvcr71.dll

  • pthreads.dll

If you are deploying StreamBase C++ VC8 clients to systems without StreamBase installed, you must include an additional file and perform some additional configuration. Specifically:

  • The Microsoft Visual Studio 2005 C++ Redistributables, or the equivalent, must be installed on the system.

  • The file, pthreads-vc8.dll, must be in the directory with your client, or in a directory on your PATH.

Microsoft occasionally issues service packs for Visual Studio .NET. If you build a C++ client with a particular service pack version of Visual Studio, you must install the corresponding version of the Microsoft Visual C++ Redistributable package on your non-StreamBase deployment system. For example, if you build a C++ client with Visual Studio .NET 2005 SP1 (VC 8.0 SP1), you must install the SP1 version of the Redistributable package for VC 8.0.