Changeset 304
- Timestamp:
- 02/25/08 06:28:26 (9 months ago)
- Location:
- trunk/thrucommon
- Files:
-
- 4 added
- 5 modified
-
Makefile.am (modified) (1 diff)
-
configure.in (modified) (2 diffs)
-
src/Makefile.am (modified) (2 diffs)
-
src/SpreadConnection.cpp (modified) (15 diffs)
-
src/SpreadConnection.h (modified) (4 diffs)
-
tests (added)
-
tests/Makefile.am (added)
-
tests/SpreadTest.cpp (added)
-
tests/test.conf (added)
Legend:
- Unmodified
- Added
- Removed
-
trunk/thrucommon/Makefile.am
r182 r304 1 SUBDIRS = src 1 SUBDIRS = src tests -
trunk/thrucommon/configure.in
r251 r304 13 13 AC_PROG_LIBTOOL 14 14 AC_PROG_MAKE_SET 15 16 # libcppunit, optional 17 AC_ARG_WITH(tests, 18 AC_HELP_STRING([--with-tests], 19 [build test suite support, requires cppunit (default is no]), 20 [with_memcached=$withval], 21 [with_memcached=no]) 22 if test "x$with_tests" = "xyes"; then 23 AM_PATH_CPPUNIT(1.9.6) 24 if test "x$no_cppunit" != "xyes" ; then 25 AC_DEFINE([HAVE_CPPUNIT], [1], [Define to 1 if you have the cppunit library (-lcppunit)]) 26 fi 27 fi 15 28 16 29 AC_LANG(C++) … … 100 113 AC_CONFIG_FILES([Makefile 101 114 src/Makefile 102 src/thrucommon.pc]) 115 src/thrucommon.pc 116 tests/Makefile]) 103 117 AC_OUTPUT -
trunk/thrucommon/src/Makefile.am
r281 r304 17 17 ThruFileTransport.h \ 18 18 RecoveryManager.h \ 19 SpreadConnection.h \ 19 20 SpreadManager.h \ 20 21 SpreadTaskFactory.h \ … … 35 36 ThruFileTransport.cpp \ 36 37 RecoveryManager.cpp \ 38 SpreadConnection.cpp \ 37 39 SpreadManager.cpp \ 38 40 Transaction.cpp \ -
trunk/thrucommon/src/SpreadConnection.cpp
r303 r304 2 2 #include "SpreadConnection.h" 3 3 4 using namespace log4cxx; 4 5 using namespace std; 5 6 6 7 string SP_error_to_string (int error); 8 9 LoggerPtr SpreadConnection::logger (Logger::getLogger ("SpreadConnection")); 7 10 8 11 SpreadConnection::SpreadConnection (const string & name, … … 39 42 SpreadConnection::~SpreadConnection () 40 43 { 44 LOG4CXX_INFO (logger, "~SpreadConnection"); 41 45 map<string, vector<string> >::iterator i; 42 46 for (i = this->groups.begin (); … … 51 55 void SpreadConnection::join (const std::string & group) 52 56 { 57 LOG4CXX_DEBUG (logger, "join: group=" + group); 53 58 map<string, vector<string> >::iterator i; 54 59 i = this->groups.find (group.c_str ()); 55 60 if (i != this->groups.end ()) 56 61 { 57 // TODO: throw an exception62 LOG4CXX_DEBUG (logger, " already a member"); 58 63 } 59 64 else … … 70 75 // that's handled elsewhere, as are membership changes 71 76 this->groups[group]; 77 LOG4CXX_DEBUG (logger, " joined"); 72 78 } 73 79 } … … 75 81 void SpreadConnection::leave (const std::string & group) 76 82 { 83 LOG4CXX_DEBUG (logger, "leave: group=" + group); 77 84 map<string, vector<string> >::iterator i; 78 85 i = this->groups.find (group.c_str ()); … … 80 87 { 81 88 SP_leave (this->mbox, group.c_str ()); 89 // TODO: this will unsubscribe everyone... 82 90 this->groups.erase (i); 91 LOG4CXX_DEBUG (logger, " left"); 83 92 } 84 93 } … … 86 95 void SpreadConnection::subscribe (const string & sender, const string & group, 87 96 const int message_type, 88 subscriber_callback callback) 89 { 97 SubscriberCallbackInfo * callback) 98 { 99 char buf[256]; 100 sprintf (buf, "subscribe: sender=%s, group=%s, message_type=%d", 101 sender.c_str (), group.c_str (), message_type); 102 LOG4CXX_DEBUG (logger, buf); 90 103 if (!group.empty ()) 91 104 { … … 101 114 const char * message, const int message_len) 102 115 { 116 if (logger->isDebugEnabled ()) 117 { 118 char buf[128]; 119 sprintf (buf, "queue: group=%s, message_type=%d", group.c_str (), 120 message_type); 121 LOG4CXX_DEBUG (logger, buf); 122 } 103 123 QueuedMessage * queued_message = 104 124 (QueuedMessage*)malloc (sizeof (QueuedMessage)); 105 queued_message->group = group;125 queued_message->group = strndup (group.c_str (), group.length ()); 106 126 queued_message->message_type = message_type; 107 127 queued_message->message = strndup (message, message_len); … … 112 132 void SpreadConnection::run (int count) 113 133 { 134 if (logger->isDebugEnabled ()) 135 { 136 char buf[64]; 137 sprintf (buf, "run: count=%d", count); 138 LOG4CXX_DEBUG (logger, buf); 139 } 114 140 // send out any pending message 115 141 this->drain_pending (); … … 129 155 char buf[buf_size]; 130 156 131 int i ;157 int i = 0; 132 158 while (!count || i < count) 133 159 { 160 LOG4CXX_DEBUG (logger, "run: receiving"); 134 161 buf_len = SP_receive (this->mbox, &service_type, sender, 135 162 max_groups, &num_groups, groups, &type, 136 163 &endian_mismatch, buf_size, buf); 164 // TODO: tmp 165 { 166 char buf[128]; 167 sprintf (buf, "recv: buf_len=%d, sender=%s, group=%s, type=%d", 168 buf_len, sender, groups[0], type); 169 LOG4CXX_DEBUG (logger, buf); 170 } 137 171 if (buf_len > 0) 138 172 { … … 173 207 // drain any pending messages 174 208 this->drain_pending (); 175 } 176 } 177 178 void SpreadConnection::make_callbacks (vector<subscriber_callback> callbacks, 179 const string & sender, 180 const string & group, 181 const int message_type, 182 const char * message, 183 const int message_len) 184 { 209 210 i++; 211 } 212 LOG4CXX_DEBUG (logger, "run: done"); 213 } 214 215 void SpreadConnection::make_callbacks 216 (vector<SubscriberCallbackInfo *> callbacks, const string & sender, 217 const string & group, const int message_type, const char * message, 218 const int message_len) 219 { 220 if (logger->isDebugEnabled ()) 221 { 222 char buf[256]; 223 sprintf (buf, "make_callbacks: callbacks.size=%d, sender=%s, group=%s, message_type=%d", 224 (int)callbacks.size (), sender.c_str (), group.c_str (), 225 message_type); 226 LOG4CXX_DEBUG (logger, buf); 227 } 185 228 bool ret; 186 vector< subscriber_callback>::iterator i;229 vector<SubscriberCallbackInfo *>::iterator i; 187 230 for (i = callbacks.begin (); i != callbacks.end (); i++) 188 231 { 189 ret = (*i)(this, sender, group, message_type, message, message_len); 232 subscriber_callback call = (*i)->callback; 233 void * data = (*i)->data; 234 ret = (call)(this, sender, group, message_type, message, 235 message_len, data); 236 ret = ((*i)->callback)(this, sender, group, message_type, message, 237 message_len, (*i)->data); 190 238 if (!ret) 191 239 callbacks.erase (i); … … 197 245 const int message_len) 198 246 { 247 if (logger->isDebugEnabled ()) 248 { 249 char buf[128]; 250 sprintf (buf, "dispatch: sender=%s, group=%s, message_type=%d", 251 sender.c_str (), group.c_str (), message_type); 252 LOG4CXX_DEBUG (logger, buf); 253 } 254 199 255 // if we haven't installed any callbacks, there's no reason to continue 200 256 if (this->subscriptions.empty ()) … … 202 258 203 259 // TODO: recieve self flag 204 if (this->private_group == sender )260 if (this->private_group == sender && !this->receive_self) 205 261 return; 206 262 … … 208 264 // C++ is compared to perl where this is 8 lines of code :( 209 265 210 map<string, map<string, map<int, vector<subscriber_callback> > > >::iterator 211 s = this->subscriptions.find (sender); 266 map<string, map<string, map<int, 267 vector<SubscriberCallbackInfo *> > > >::iterator s = 268 this->subscriptions.find (sender); 212 269 if (s != this->subscriptions.end ()) 213 270 { 214 map<string, map<int, vector<subscriber_callback> > >::iterator s_g = 215 (*s).second.find (group); 271 map<string, map<int, 272 vector<SubscriberCallbackInfo *> > >::iterator s_g = 273 (*s).second.find (group); 216 274 if (s_g != (*s).second.end ()) 217 275 { 218 map<int, vector< subscriber_callback> >::iterator s_g_t =276 map<int, vector<SubscriberCallbackInfo *> >::iterator s_g_t = 219 277 (*s_g).second.find (message_type); 220 278 if (s_g_t != (*s_g).second.end ()) 221 279 { 222 280 // we have a sender, group, type match 223 this->make_callbacks ((*s_g_t).second, sender, group, 224 message_type, message, message_len); 225 } 226 map<int, vector< subscriber_callback> >::iterator s_g_et =281 this->make_callbacks ((*s_g_t).second, sender, group, 282 message_type, message, message_len); 283 } 284 map<int, vector<SubscriberCallbackInfo *> >::iterator s_g_et = 227 285 (*s_g).second.find (-1); 228 286 if (s_g_et != (*s_g).second.end ()) 229 287 { 230 288 // we have a sender, group, empty type match 231 this->make_callbacks ((*s_g_et).second, sender, group, 232 message_type, message, message_len); 233 } 234 } 235 map<string, map<int, vector<subscriber_callback> > >::iterator s_eg = 236 (*s).second.find (""); 289 this->make_callbacks ((*s_g_et).second, sender, group, 290 message_type, message, message_len); 291 } 292 } 293 map<string, map<int, 294 vector<SubscriberCallbackInfo *> > >::iterator s_eg = 295 (*s).second.find (""); 237 296 if (s_eg != (*s).second.end ()) 238 297 { 239 map<int, vector< subscriber_callback> >::iterator s_eg_t =298 map<int, vector<SubscriberCallbackInfo *> >::iterator s_eg_t = 240 299 (*s_eg).second.find (message_type); 241 300 if (s_eg_t != (*s_eg).second.end ()) 242 301 { 243 302 // we have a sender, empty group, type match 244 this->make_callbacks ((*s_eg_t).second, sender, group, 245 message_type, message, message_len); 246 } 247 map<int, vector< subscriber_callback> >::iterator s_eg_et =303 this->make_callbacks ((*s_eg_t).second, sender, group, 304 message_type, message, message_len); 305 } 306 map<int, vector<SubscriberCallbackInfo *> >::iterator s_eg_et = 248 307 (*s_eg).second.find (-1); 249 308 if (s_eg_et != (*s_eg).second.end ()) 250 309 { 251 310 // we have a sender, empty group, empty type match 252 this->make_callbacks ((*s_eg_et).second, sender, group, 253 message_type, message, message_len); 254 } 255 } 256 } 257 map<string, map<string, map<int, vector<subscriber_callback> > > >::iterator 258 es = this->subscriptions.find (""); 311 this->make_callbacks ((*s_eg_et).second, sender, group, 312 message_type, message, message_len); 313 } 314 } 315 } 316 map<string, map<string, map<int, 317 vector<SubscriberCallbackInfo *> > > >::iterator 318 es = this->subscriptions.find (""); 259 319 if (es != this->subscriptions.end ()) 260 320 { 261 map<string, map<int, vector<subscriber_callback> > >::iterator es_g = 262 (*es).second.find (group); 321 map<string, map<int, 322 vector<SubscriberCallbackInfo *> > >::iterator es_g = 323 (*es).second.find (group); 263 324 if (es_g != (*es).second.end ()) 264 325 { 265 map<int, vector< subscriber_callback> >::iterator es_g_t =326 map<int, vector<SubscriberCallbackInfo *> >::iterator es_g_t = 266 327 (*es_g).second.find (message_type); 267 328 if (es_g_t != (*es_g).second.end ()) 268 329 { 269 330 // we have a empty sender, group, type match 270 this->make_callbacks ((*es_g_t).second, sender, group, 271 message_type, message, message_len); 272 } 273 map<int, vector< subscriber_callback> >::iterator es_g_et =331 this->make_callbacks ((*es_g_t).second, sender, group, 332 message_type, message, message_len); 333 } 334 map<int, vector<SubscriberCallbackInfo *> >::iterator es_g_et = 274 335 (*es_g).second.find (-1); 275 336 if (es_g_et != (*es_g).second.end ()) 276 337 { 277 338 // we have a empty sender, group, empty type match 278 this->make_callbacks ((*es_g_et).second, sender, group, 279 message_type, message, message_len); 280 } 281 } 282 map<string, map<int, vector<subscriber_callback> > >::iterator es_eg = 339 this->make_callbacks ((*es_g_et).second, sender, group, 340 message_type, message, message_len); 341 } 342 } 343 map<string, map<int, 344 vector<SubscriberCallbackInfo *> > >::iterator es_eg = 283 345 (*es).second.find (""); 284 346 if (es_eg != (*es).second.end ()) 285 347 { 286 map<int, vector< subscriber_callback> >::iterator es_eg_t =348 map<int, vector<SubscriberCallbackInfo *> >::iterator es_eg_t = 287 349 (*es_eg).second.find (message_type); 288 350 if (es_eg_t != (*es_eg).second.end ()) 289 351 { 290 352 // we have a empty sender, empty group, type match 291 this->make_callbacks ((*es_eg_t).second, sender, group, 292 message_type, message, message_len); 293 } 294 map<int, vector< subscriber_callback> >::iterator es_eg_et =353 this->make_callbacks ((*es_eg_t).second, sender, group, 354 message_type, message, message_len); 355 } 356 map<int, vector<SubscriberCallbackInfo *> >::iterator es_eg_et = 295 357 (*es_eg).second.find (-1); 296 358 if (es_eg_et != (*es_eg).second.end ()) 297 359 { 298 360 // we have a empty sender, empty group, empty type match 299 this->make_callbacks ((*es_eg_et).second, sender, group, 361 this->make_callbacks ((*es_eg_et).second, sender, group, 300 362 message_type, message, message_len); 301 363 } … … 306 368 void SpreadConnection::drain_pending () 307 369 { 370 if (logger->isDebugEnabled ()) 371 { 372 char buf[64]; 373 sprintf (buf, "drain_pending: pending_messages.size=%d", 374 (int)this->pending_messages.size ()); 375 LOG4CXX_DEBUG (logger, buf); 376 } 308 377 int ret; 309 378 QueuedMessage * qm; 310 379 while (!this->pending_messages.empty ()) 311 380 { 381 LOG4CXX_DEBUG (logger, "drain_pending: draining"); 312 382 qm = this->pending_messages.front (); 313 383 this->pending_messages.pop (); 314 ret = SP_multicast (this->mbox, SAFE_MESS | SELF_DISCARD, 315 qm->group .c_str (), qm->message_type,384 ret = SP_multicast (this->mbox, SAFE_MESS | SELF_DISCARD, 385 qm->group, qm->message_type, 316 386 qm->message_len, qm->message); 317 387 // TODO: error handling 388 free (qm->group); 318 389 free (qm->message); 319 390 free (qm); 320 391 } 392 LOG4CXX_DEBUG (logger, "drain_pending: done"); 321 393 } 322 394 -
trunk/thrucommon/src/SpreadConnection.h
r303 r304 17 17 /* callbacks return true to stay installed, false to be removed */ 18 18 typedef bool (*subscriber_callback) (SpreadConnection * spread_connection, 19 const std::string & sender, 20 const std::string & group, 21 const int message_type, 22 const char * message, 23 const int message_len); 19 const std::string & sender, 20 const std::string & group, 21 const int message_type, 22 const char * message, 23 const int message_len, 24 void * data); 25 typedef struct SubscriberCallbackInfo SubscriberCallbackInfo; 26 struct SubscriberCallbackInfo 27 { 28 subscriber_callback callback; 29 void * data; 30 }; 24 31 25 32 struct QueuedMessage 26 33 { 27 std::stringgroup;34 char * group; 28 35 int message_type; 29 36 char * message; … … 34 41 { 35 42 public: 36 SpreadConnection (const std::string & name, 43 SpreadConnection (const std::string & name, 37 44 const std::string & private_name); 38 45 ~SpreadConnection (); 39 46 40 47 void subscribe (const std::string & sender, const std::string & group, 41 const int message_type, subscriber_callback callback); 48 const int message_type, 49 SubscriberCallbackInfo * callback); 42 50 void queue (const std::string & group, const int message_type, 43 51 const char * message, const int message_len); 44 52 void run (int count); 53 54 void set_receive_self (bool receive_self) 55 { 56 this->receive_self = receive_self; 57 } 58 59 std::string get_private_group () 60 { 61 return this->private_group; 62 } 45 63 46 64 private: … … 51 69 std::string group; 52 70 std::string private_group; 71 bool receive_self; 53 72 mailbox mbox; 54 73 std::map<std::string, std::vector<std::string> > groups; 55 std::map<std::string, std::map<std::string, 56 std:: map<int, std::vector<subscriber_callback> > > > subscriptions;74 std::map<std::string, std::map<std::string, std::map<int, 75 std::vector<SubscriberCallbackInfo *> > > > subscriptions; 57 76 std::queue<QueuedMessage *> pending_messages; 58 77 … … 62 81 const int message_type, const char * message, 63 82 const int message_len); 64 void make_callbacks (std::vector< subscriber_callback> callbacks,65 const std::string & sender, 83 void make_callbacks (std::vector<SubscriberCallbackInfo *> callbacks, 84 const std::string & sender, 66 85 const std::string & group, 67 86 const int message_type, const char * message,
