This topic describes how to create enqueue and dequeue C++ client applications for StreamBase applications.
This section describes how to use the StreamBase C++ API to write a client application that enqueues data to a StreamBase Server.
Sample source code: streambase-directory/sample/compliance/preload.cpp
The basic procedure for enqueuing data into a StreamBase node in C++ is:
-
Include
StreamBaseClient.hpp -
Create an instance of the
StreamBaseClientclass. If needed, specify the String URI of the desired StreamBase node as an argument. For example:StreamBaseClient client("sb://localhost:10000/"); -
For every tuple of data to enqueue:
-
Retrieve a
Schemaobject for each stream you want to enqueue. For example:Schema schema = client.getSchemaForStream("InputStream");...where "
InputStream" is the name of an input stream in your StreamBase application. -
Create a
BufferedTupleof the given schema; in this example, the schema of the same stream as above:BufferedTuple tuple(schema); -
Set values for each of the tuple's fields. For example:
tuple.setInt("myint",5); tuple.setString("mystring","hello"); // ... // where "myint" and "mystring" are field names in the input stream // of your StreamBase application -
Enqueue the tuple onto the stream. For example:
client.enqueue("InputStream",tuple);
-
On a supported UNIX machine
where StreamBase is installed, use the sb-config utility to set up the environment and define the compiler
to use when compiling your program. For example:
CXX=`sb-config --cxx` $CXX MyClient.cpp `sb-config --cflags` -c -oMyClient.o $CXX MyClient.o `sb-config --libs` -oMyClient
Substitute the name of your client for MyClient.
Use the Makefiles in the following StreamBase samples as a guide to setting up your projects:
/opt/streambase/sample/custom-aggregate-function
/opt/streambase/sample/custom-simple-function
To build your C++ client application on Windows, you must configure Microsoft Visual Studio as described in Configuring Visual C++.
For enqueue clients that use the StreamBase C++ Client Library API, you can enable tuple buffering and set the following parameters:
-
The buffer size (number of tuples per buffer)
-
The buffer's flush interval
You can also explicitly flush the buffer of a specified stream, or flush all buffers.
By enabling tuple buffering and experimenting with these parameters, you may be able to improve the efficiency and performance of your enqueue code. A single enqueue of a buffer containing (for example) 100 tuples should be more efficient than making 100 separate enqueue operations.
The examples in this section demonstrate how to use this feature. Here the buffer
size is set to 100 tuples and the buffer flush interval
is set to 1000 milliseconds (one second).
Notes: In the StreamBase
client APIs, buffering is only turned on if the value for the buf_size parameter is greater than zero. (The buf_size parameter specifies the number of tuples, not a byte
limit.) Also the buffer is flushed at a regular interval specified by the
flush_interval parameter. If the flush_interval is not greater than zero, the buffer will only be
flushed when it reaches capacity (it is filled with tuples).
There are two ways to turn on buffering in C++. They are shown in the two examples that follow.
The first buffering method using the StreamBase C++ client API is to
specify the buffering parameters in the StreamBaseClient constructor. For example:
#include "StreamBase.hpp"
#include "StreamBaseClient.hpp"
#include "Tuple.hpp"
using namespace sb;
.
.
.
// grab the URI, either from the command line or the environment
StreamBaseURI uri;
uri = StreamBaseURI::fromEnvironment();
// connect to the StreamBaseServer
int buf_size = 100;
int flush_interval = 1000;
StreamBaseClient client(uri,buf_size,flush_interval);
The second buffering alternative for C++ clients is to use the enableBuffering method. For example:
#include "StreamBase.hpp"
#include "StreamBaseClient.hpp"
#include "Tuple.hpp"
using namespace sb;
.
.
.
// grab the URI, either from the environment
StreamBaseURI uri;
uri = StreamBaseURI::fromEnvironment();
// connect to the StreamBaseServer
int buf_size = 100;
int flush_interval = 1000;
StreamBaseClient client(uri);
client.enableBuffering(buf_size,flush_interval);
Note: If buffering is enabled and the
flush_interval is set, the buffer will be flushed
periodically based on the specified interval. However, to have more control over
enqueuing buffered tuples, you can also use the flushBuffer(stream_name) method of the StreamBaseClient class to enqueue the contents of a buffer
immediately. Or, you can use the flushAllBuffers()
method to enqueue the contents of all buffers. These methods assume that the caller
has a lock on _buffers_lock.
Generally you want to "flush" a buffer when you are concerned that the tuples in
the buffer may become stale (staleness is relative to the particular application).
Let's say, for example, your buffer size is 1000 and your flush_interval is 0 (this value turns off periodic flushing). The
buffer will automatically be flushed when it is filled. That is, when the 1000th
tuple has been enqueued. If 300 tuples have just been enqueued and it is unclear
when or if more tuples will arrive to fill the buffer, it would be a good idea to
call flushBuffer() or flushAllBuffers().
This section describes how to use the StreamBase C++ API to write a client application that dequeues data from a StreamBase Server.
Note: You can make your application dequeue a subset of the server output instead of all the output. For details, see the topic, Narrowing Dequeue Results with Filtered Subscribe.
Performance Note: Dequeue (producer)
clients that are slow may eventually get disconnected from
the StreamBase Server, sbd. StreamBase uses a memory parameter called
Maximum pages per client connection in the IDE, and
max-client-pages in the server's *.sbconf configuration file. The sbd process will disconnect clients that try to allocate
more memory than the limit set by this parameter. It is designed to protect
sbd from a slow or hung dequeue client.
For more information, see Defining a StreamBase Server Configuration in the Administration Guide.
Sample source code: streambase-directory/sample/compliance/outputviewer.cpp
The basic procedure for dequeuing data from a StreamBase node in C++ is as follows:
-
Include
StreamBaseClient.hpp -
Create an instance of the
StreamBaseClientclass. If needed, specify the String uri of the desired StreamBase node as an argument. For example:StreamBaseClient client("sb://localhost:10000/"); -
Subscribe to each stream that you want to dequeue. For example:
client.subscribe("OutputStream");...where
OutputStreamis the name of an output stream in your StreamBase application. -
Call the
dequeue(String s)method on the client, which blocks until a tuple becomes available. This method returns a list of Tuples dequeued from each of the streams to which you subscribed, and whose name you set with the string argument. For example:string stream; ConstTupleList tuples = client.dequeue(stream); //stream is now "OutputStream"
-
If the Tuple list is empty, the server (or your client from another thread) is requesting that you close the connection; you should exit the dequeue method. For example:
if (tuples.empty()) return;
-
Otherwise, you can do the following:
-
Loop over the Tuple list. For example:
for (size_t i = 0; i < tuples.size(); ++i) { -
Retrieve each Tuple as a
ConstTupleobject. For example:ConstTuple tuple = tuples[i];
-
...And use it to access the tuple's constituent fields. For example:
int myint = tuple.getInt("myint"); string mystring = tuple.getString("mystring"); // ... // where "myint" and "mystring" are field names in the output stream // of your StreamBase application.
-
-
If you wish to cancel the blocking
dequeuecall from any thread, (for example if your client is shutting down), callmetclient.close();. In fact, before exiting you should always call theclient.close()method to flush the client network buffers.Important
Note that the StreamBase client API is not thread-safe. You cannot share a client connection across threads. For example, if you attempt to close a client from a thread other than the one in which it was created, a stack overflow error will occur. To run clients multi-threaded, you would need to establish a new client for each thread.
The procedure for compiling dequeue clients is the same as described above for enqueue clients. Remember to
always call client.close() to flush the client network buffers before
exiting.
C++ clients can be installed on systems that do not have StreamBase installed. However, you also must include the following files in the directory with your client executable, or in a directory on the PATH:
-
The basic Visual Studio VC 7.1 runtime libraries,
msvcp71.dllandmsvcr71.dll -
pthreads.dll
If you are deploying StreamBase C++ VC8 clients to systems without StreamBase installed, you must include an additional file and perform some additional configuration. Specifically:
-
The Microsoft Visual Studio 2005 C++ Redistributables, or the equivalent, must be installed on the system.
-
The file,
pthreads-vc8.dll, must bein the directory with your client, or in a directory on your PATH.
Microsoft occasionally issues service packs for Visual Studio .NET. If you build a C++ client with a particular service pack version of Visual Studio, you must install the corresponding version of the Microsoft Visual C++ Redistributable package on your non-StreamBase deployment system. For example, if you build a C++ client with Visual Studio .NET 2005 SP1 (VC 8.0 SP1), you must install the SP1 version of the Redistributable package for VC 8.0.
-
Creating Java Clients in the API Guide
