Changeset 313

Show
Ignore:
Timestamp:
02/27/08 00:43:56 (9 months ago)
Author:
rm
Message:

Replication:

  • listen for replay next requests on both the main group and direct to the recorder.
  • improve Spread's drain behavior in run
  • provide access to group membership in Spread
  • ReplicationBackend? now keeps track of who it's syncing from, asks everyone the first time and uses the first to answer.
Location:
trunk
Files:
1 added
5 modified

Legend:

Unmodified
Added
Removed
  • trunk/thrucommon/src/ReplicationRecorder.cpp

    r311 r313  
    4747                            live_callback_info); 
    4848 
    49     // subscribe to "replay" messages 
     49    // subscribe to "replay" messages, both broadcast and direct 
    5050    SubscriberCallbackInfo * replay_callback_info =  
    5151        new SubscriberCallbackInfo (); 
     
    5454    this->spread.subscribe ("", this->replication_group, REPLAY_MESSAGE_TYPE, 
    5555                            replay_callback_info); 
     56    this->spread.subscribe ("", this->spread.get_private_group (), 
     57                            REPLAY_MESSAGE_TYPE, replay_callback_info); 
    5658} 
    5759 
  • trunk/thrucommon/src/Spread.cpp

    r312 r313  
    167167    } 
    168168 
    169     // send out any pending message 
    170     this->drain_pending (); 
    171  
    172169    service service_type = 0; 
    173170    char sender[MAX_GROUP_NAME]; 
     
    182179    while (!count || i < count) 
    183180    { 
     181        // drain any pending messages 
     182        this->drain_pending (); 
     183 
    184184        // created here so that if we have to grow them they'll be recreated 
    185185        // larger, see errorhandling below 
     
    272272            } 
    273273        } 
    274         // drain any pending messages 
    275         this->drain_pending (); 
    276274    } 
    277275    LOG4CXX_DEBUG (logger, "run:    done"); 
  • trunk/thrucommon/src/Spread.h

    r312 r313  
    8080        } 
    8181 
     82        const std::set<std::string> get_group_members (std::string group) 
     83        { 
     84            return this->groups[group]; 
     85        } 
     86 
    8287    private: 
    8388        static log4cxx::LoggerPtr logger; 
  • trunk/thrudoc/src/ReplicationBackend.cpp

    r311 r313  
    409409} 
    410410 
     411// TODO: handle when we don't hear from our replay host for a while... 
    411412bool ReplicationBackend::handle_replay_message  
    412 (const std::string & /* sender */, 
     413(const std::string & sender, 
    413414 const std::vector<std::string> & /* groups */, 
    414415 const int /* message_type */, const char * message, const int message_len) 
    415416{ 
    416417    LOG4CXX_DEBUG (logger, "handle_replay_message:"); 
     418 
     419    // first one to answer becomes the person we'll ask in the future 
     420    if (this->current_replay_name.empty ()) 
     421    { 
     422        this->current_replay_name = sender; 
     423    } 
     424    else if (this->current_replay_name != sender) 
     425    { 
     426        // this isn't from our offical replay host, skip it 
     427        return true; 
     428    } 
    417429 
    418430    // TODO: don't recreate these every time... 
     
    454466        // actually happened since the last message we recorded 
    455467        this->listener_live = true; 
     468        // unset our current_replay_name, so we'll get one again next time 
     469        this->current_replay_name = ""; 
    456470    } 
    457471 
     
    579593void ReplicationBackend::request_next (string uuid) 
    580594{ 
     595    string who_to_ask = this->current_replay_name; 
     596    if (who_to_ask.empty ()) 
     597    { 
     598        // we don't have a current replay from, so ask them all, first one to 
     599        // respond will be the the new replay host 
     600        who_to_ask = this->replication_group; 
     601    } 
     602    LOG4CXX_DEBUG (logger, "request_next: who_to_ask=" + who_to_ask); 
     603 
    581604    // we don't want our own message back here... 
    582     this->spread.send (RELIABLE_MESS | SELF_DISCARD, this->replication_group, 
     605    this->spread.send (RELIABLE_MESS | SELF_DISCARD, who_to_ask, 
    583606                       REPLAY_MESSAGE_TYPE, uuid.c_str (), uuid.length ()); 
    584607} 
  • trunk/thrudoc/src/ReplicationBackend.h

    r311 r313  
    7373        Spread spread; 
    7474        std::string replication_group; 
     75        std::string current_replay_name; 
    7576        std::string replication_status_file; 
    7677        int replication_status_flush_frequency;