Publish-Subscribe

Publisher
Subscriber
Configuration
Running
Performance
Byte Array Interface  Float Array Interface  Structure Interface 

Ice provides a publish-subscribe system called IceStorm which uses the IceBox framework. This is all built on basic client-server transactions. Again, very little application-specific code is required, as can be seen from the following code summaries.

Publisher

#include <Ice/Application.h>
#include <IceStorm/IceStorm.h>
#include <math.h>
#include <unistd.h>
#include <MessagesI.h>

using namespace std;
using namespace CODAC;

class Publisher : public Ice::Application
{
public:
   virtual int run(int, char*[]);
};

int
Publisher::run(int argc, char* argv[])
{
   // declarations (application)
   ......

   // initialize publisher and attach to topic (Ice calls)
   ......

   // initialize message buffer (application)
   ......

   // send messages (application)
   for (int m = 0; m < messageCount; m++) {
      // set message data
      ......

      // transmit message using MessagesI member functions
      //errorCount = twoway->publishDataMessageTwoWay(message);
      oneway->publishDataMessageOneWay(message);
      ......

      // record timing
      ......
   }

   // report results (application)
   ......

   return EXIT_SUCCESS;
}

int
main(int argc, char* argv[])
{
   Publisher app;
   return app.main(argc, argv, "config.pub");
}

Subscriber

#include <map>
#include <Ice/Application.h>
#include <IceStorm/IceStorm.h>
#include <IceUtil/UUID.h>
#include <MessagesI.h>

using namespace std;
using namespace CODAC;

class Subscriber : public Ice::Application
{
public:
   virtual int run(int, char*[]);
};

int
Subscriber::run(int argc, char* argv[])
{
   // initialize subscriber (Ice calls)
   ......

   // subscribe to topics (Ice calls)
   ......

   // set the requested quality of service (Ice calls)
   ......

   // activate subscriber and enter message processing loop (Ice calls)
   // call MessagesI member function when a message arrives
   ......

   // exit on subscriber shutdown (Ice calls)
   return EXIT_SUCCESS;
}

int
main(int argc, char* argv[])
{
   Subscriber app;
   return app.main(argc, argv, "config.sub");
}

Configuration

These extracts from the configuration files show how communication transport and endpoints are defined.

Topic Manager:

IceStorm.TopicManager.Endpoints=tcp -h 192.168.1.41 -p 10000

Publisher and Subscriber:

IceStorm.TopicManager.Proxy=IceStorm/TopicManager:tcp -h 192.168.1.41 -p 10000

Running

The tests were run on bigbeast and storm08-storm11 by doing:

bigbeast: icebox --Ice.Config=config.icebox
bigbeast: ./subscriber
storm08: ./publisher 1
storm09: ./publisher 2
storm10: ./publisher 3
storm11: ./publisher 4
Different message sizes were configured by changing constants in MessagesI.h, followed by recompilation.

Performance

Tests were done using oneway calls only for maximum throughput. Note that average throughput here combines the throughput of the 4 publishers to the bonded Gigabit Ethernet interface on bigbeast. The maximum possible is 4*117 = 468MB/s. The best throughput shown is here 282MB/s, only 60% of that maximum. But the tests recorded were done with a single threaded icebox and subscriber, and only 2 of the 4 cpus were in use as shown by 'top'. When the icebox and subscriber were configured to use 2 threads each, with byte array interface and 100kB message size the throughput was 452MB/s. Test duration was several minutes, with publishers sending messages in a continuous loop. Time measurements were made using the publisher's high resolution cpu timer. 1MB = 1.0e6 bytes.

Byte Array Interface

message size (MB) average cycle time (ms) average throughput (MB/s)
0.01 0.22 178
0.1 1.4 282
1.0 22.0 180
10.0 247 160

Float Array Interface

message size (MB) average cycle time (ms) average throughput (MB/s)
0.01 0.24 165
0.1 1.6 254
1.0 22.5 179
10.0 225 178

Structure Interface

message size (MB) average cycle time (ms) average throughput (MB/s)
0.01 0.25 162
0.1 1.88 213
1.0 23.0 174
10.0 246 162