Changeset 313
- Timestamp:
- 02/27/08 00:43:56 (9 months ago)
- Location:
- trunk
- Files:
-
- 1 added
- 5 modified
-
thrucommon/src/ReplicationRecorder.cpp (modified) (2 diffs)
-
thrucommon/src/Spread.cpp (modified) (3 diffs)
-
thrucommon/src/Spread.h (modified) (1 diff)
-
thrucommon/src/thrurecorder.conf (added)
-
thrudoc/src/ReplicationBackend.cpp (modified) (3 diffs)
-
thrudoc/src/ReplicationBackend.h (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
-
trunk/thrucommon/src/ReplicationRecorder.cpp
r311 r313 47 47 live_callback_info); 48 48 49 // subscribe to "replay" messages 49 // subscribe to "replay" messages, both broadcast and direct 50 50 SubscriberCallbackInfo * replay_callback_info = 51 51 new SubscriberCallbackInfo (); … … 54 54 this->spread.subscribe ("", this->replication_group, REPLAY_MESSAGE_TYPE, 55 55 replay_callback_info); 56 this->spread.subscribe ("", this->spread.get_private_group (), 57 REPLAY_MESSAGE_TYPE, replay_callback_info); 56 58 } 57 59 -
trunk/thrucommon/src/Spread.cpp
r312 r313 167 167 } 168 168 169 // send out any pending message170 this->drain_pending ();171 172 169 service service_type = 0; 173 170 char sender[MAX_GROUP_NAME]; … … 182 179 while (!count || i < count) 183 180 { 181 // drain any pending messages 182 this->drain_pending (); 183 184 184 // created here so that if we have to grow them they'll be recreated 185 185 // larger, see errorhandling below … … 272 272 } 273 273 } 274 // drain any pending messages275 this->drain_pending ();276 274 } 277 275 LOG4CXX_DEBUG (logger, "run: done"); -
trunk/thrucommon/src/Spread.h
r312 r313 80 80 } 81 81 82 const std::set<std::string> get_group_members (std::string group) 83 { 84 return this->groups[group]; 85 } 86 82 87 private: 83 88 static log4cxx::LoggerPtr logger; -
trunk/thrudoc/src/ReplicationBackend.cpp
r311 r313 409 409 } 410 410 411 // TODO: handle when we don't hear from our replay host for a while... 411 412 bool ReplicationBackend::handle_replay_message 412 (const std::string & /* sender */,413 (const std::string & sender, 413 414 const std::vector<std::string> & /* groups */, 414 415 const int /* message_type */, const char * message, const int message_len) 415 416 { 416 417 LOG4CXX_DEBUG (logger, "handle_replay_message:"); 418 419 // first one to answer becomes the person we'll ask in the future 420 if (this->current_replay_name.empty ()) 421 { 422 this->current_replay_name = sender; 423 } 424 else if (this->current_replay_name != sender) 425 { 426 // this isn't from our offical replay host, skip it 427 return true; 428 } 417 429 418 430 // TODO: don't recreate these every time... … … 454 466 // actually happened since the last message we recorded 455 467 this->listener_live = true; 468 // unset our current_replay_name, so we'll get one again next time 469 this->current_replay_name = ""; 456 470 } 457 471 … … 579 593 void ReplicationBackend::request_next (string uuid) 580 594 { 595 string who_to_ask = this->current_replay_name; 596 if (who_to_ask.empty ()) 597 { 598 // we don't have a current replay from, so ask them all, first one to 599 // respond will be the the new replay host 600 who_to_ask = this->replication_group; 601 } 602 LOG4CXX_DEBUG (logger, "request_next: who_to_ask=" + who_to_ask); 603 581 604 // we don't want our own message back here... 582 this->spread.send (RELIABLE_MESS | SELF_DISCARD, this->replication_group,605 this->spread.send (RELIABLE_MESS | SELF_DISCARD, who_to_ask, 583 606 REPLAY_MESSAGE_TYPE, uuid.c_str (), uuid.length ()); 584 607 } -
trunk/thrudoc/src/ReplicationBackend.h
r311 r313 73 73 Spread spread; 74 74 std::string replication_group; 75 std::string current_replay_name; 75 76 std::string replication_status_file; 76 77 int replication_status_flush_frequency;
