Changeset 283

Show
Ignore:
Timestamp:
02/18/08 09:06:30 (9 months ago)
Author:
rm
Message:

bring spread replication in to things. rename to just ReplicationBackend?.

Location:
trunk/thrudoc
Files:
3 modified
2 moved

Legend:

Unmodified
Added
Removed
  • trunk/thrudoc/examples/thrudoc.conf

    r234 r283  
    3838#SPREAD_GROUP = thrudoc 
    3939 
     40# spread replication backend 
     41#REPLICATION_NAME = 4803@localhost 
     42#REPLICATION_PRIVATE_NAME = ds_s1 
     43#REPLICATION_GROUP = thrudoc 
     44#REPLICATION_STATUS_FILE=replication_status 
     45#REPLICATION_STATUS_FLUSH_FREQUENCY=30 
     46 
    4047#ENABLE_BLOOM_FILTER=1 
    4148 
  • trunk/thrudoc/src/Makefile.am

    r251 r283  
    1111include_thrudocdir = $(includedir)/thrudoc 
    1212include_thrudoc_HEADERS = \ 
    13                              app_helpers.h      \ 
    14                              BDBBackend.h       \ 
    15                              BloomBackend.h     \ 
    16                              DiskBackend.h      \ 
    17                              LogBackend.h       \ 
    18                              MemcachedBackend.h \ 
    19                              MySQLBackend.h     \ 
    20                              NBackend.h         \ 
    21                              NullBackend.h      \ 
    22                              S3Backend.h        \ 
    23                              SpreadBackend.h    \ 
    24                              StatsBackend.h     \ 
     13                             app_helpers.h              \ 
     14                             BDBBackend.h               \ 
     15                             BloomBackend.h             \ 
     16                             DiskBackend.h              \ 
     17                             LogBackend.h               \ 
     18                             MemcachedBackend.h         \ 
     19                             MySQLBackend.h             \ 
     20                             NBackend.h                 \ 
     21                             NullBackend.h              \ 
     22                             ReplicationBackend.h       \ 
     23                             S3Backend.h                \ 
     24                             SpreadBackend.h            \ 
     25                             StatsBackend.h             \ 
    2526                             ThrudocBackend.h 
    2627 
     
    4041                  NBackend.cpp                          \ 
    4142                  NullBackend.cpp                       \ 
     43                  ReplicationBackend.cpp                \ 
    4244                  S3Backend.cpp                         \ 
    4345                  StatsBackend.cpp                      \ 
  • trunk/thrudoc/src/ReplicationBackend.cpp

    r280 r283  
    77#if HAVE_LIBSPREAD && HAVE_LIBUUID 
    88 
    9 #include "SpreadReplicationBackend.h" 
     9#include "ReplicationBackend.h" 
    1010 
    1111#include <sys/types.h> 
     
    2121#define UUID_LEN 37 
    2222#define MESSAGE_OVERHEAD 4 
    23 #define SPREAD_BACKEND_MAX_MESSAGE_SIZE MAX_BUCKET_SIZE + MAX_KEY_SIZE + MAX_VALUE_SIZE + UUID_LEN + MESSAGE_OVERHEAD 
     23#define REPLICATION_BACKEND_MAX_MESSAGE_SIZE MAX_BUCKET_SIZE + MAX_KEY_SIZE + MAX_VALUE_SIZE + UUID_LEN + MESSAGE_OVERHEAD 
    2424 
    2525#define ORIG_MESSAGE_TYPE 1 
     
    3535string SP_error_to_string (int error); 
    3636 
    37 class SpreadReplicationWait  
     37class ReplicationWait  
    3838{ 
    3939    public: 
    4040        // uuid is just for logging/debugging purposes 
    41         SpreadReplicationWait (string uuid, uint32_t max_wait) 
     41        ReplicationWait (string uuid, uint32_t max_wait) 
    4242        { 
    4343            char buf[128]; 
    44             sprintf (buf, "SpreadReplicationWait: uuid=%s, max_wait=%d",  
     44            sprintf (buf, "ReplicationWait: uuid=%s, max_wait=%d",  
    4545                     uuid.c_str (), max_wait); 
    4646            LOG4CXX_DEBUG (logger, buf); 
     
    5656        } 
    5757 
    58         ~SpreadReplicationWait () 
    59         { 
    60             LOG4CXX_DEBUG (logger, "~SpreadReplicationWait: uuid=" +  
     58        ~ReplicationWait () 
     59        { 
     60            LOG4CXX_DEBUG (logger, "~ReplicationWait: uuid=" +  
    6161                           this->uuid); 
    6262            pthread_cond_destroy (&this->condition); 
     
    132132}; 
    133133 
    134 class SpreadReplicationMessage 
     134class ReplicationMessage 
    135135{ 
    136136    public: 
     
    138138        { 
    139139            max_groups = 5; 
    140             buf_size = SPREAD_BACKEND_MAX_MESSAGE_SIZE; 
     140            buf_size = REPLICATION_BACKEND_MAX_MESSAGE_SIZE; 
    141141 
    142142            buf_len = SP_receive (spread_mailbox, &service_type, sender, 
     
    180180        } 
    181181 
    182         int parse (const string & spread_private_group) 
     182        int parse (const string & replication_private_group) 
    183183        { 
    184184            // TODO: need better/safer parsing than scanf... maybe thrift ser. 
     
    196196            } 
    197197            else if (type == REPLAY_MESSAGE_TYPE && 
    198                      spread_private_group == groups[0]) 
     198                     replication_private_group == groups[0]) 
    199199            { 
    200200                LOG4CXX_DEBUG (logger, "parse: replay"); 
     
    276276        int buf_size; 
    277277        int buf_len; 
    278         char buf[SPREAD_BACKEND_MAX_MESSAGE_SIZE]; 
     278        char buf[REPLICATION_BACKEND_MAX_MESSAGE_SIZE]; 
    279279 
    280280        // repli command stuff 
     
    287287 
    288288// private 
    289 LoggerPtr SpreadReplicationWait::logger (Logger::getLogger ("SpreadReplicationWait")); 
    290 LoggerPtr SpreadReplicationMessage::logger (Logger::getLogger ("SpreadReplicationMessage")); 
    291 LoggerPtr SpreadReplicationBackend::logger (Logger::getLogger ("SpreadReplicationBackend")); 
    292  
    293 SpreadReplicationBackend::SpreadReplicationBackend (shared_ptr<ThrudocBackend> backend, 
    294                                                     const string & spread_name, 
    295                                                     const string & spread_private_name, 
    296                                                     const string & spread_group) 
    297 { 
    298     LOG4CXX_INFO (logger, string ("SpreadReplicationBackend: spread_name=") + 
    299                   spread_name + ", spread_private_name=" + 
    300                   spread_private_name + ", spread_group=" + spread_group); 
     289LoggerPtr ReplicationWait::logger (Logger::getLogger ("ReplicationWait")); 
     290LoggerPtr ReplicationMessage::logger (Logger::getLogger ("ReplicationMessage")); 
     291LoggerPtr ReplicationBackend::logger (Logger::getLogger ("ReplicationBackend")); 
     292 
     293ReplicationBackend::ReplicationBackend (shared_ptr<ThrudocBackend> backend, 
     294                                        const string & replication_name, 
     295                                        const string & replication_private_name, 
     296                                        const string & replication_group, 
     297                                        const string & replication_status_file, 
     298                                        const int replication_status_flush_frequency) 
     299{ 
     300    char buf[1024]; 
     301    sprintf (buf, "ReplicationBackend: replication_name=%s, replication_private_name=%s, replication_group=%s, replication_status_file=%s, replication_status_flush_frequency=%d", 
     302             replication_name.c_str (), replication_private_name.c_str (), 
     303             replication_group.c_str (), replication_status_file.c_str (), 
     304             replication_status_flush_frequency); 
     305    LOG4CXX_INFO (logger, buf); 
    301306 
    302307    this->set_backend (backend); 
    303     this->spread_name = spread_name; 
    304     this->spread_private_name = spread_private_name; 
    305     this->spread_group = spread_group; 
     308    this->replication_name = replication_name; 
     309    this->replication_private_name = replication_private_name; 
     310    this->replication_group = replication_group; 
     311    this->replication_status_file = replication_status_file; 
     312    this->replication_status_flush_frequency = replication_status_flush_frequency; 
    306313 
    307314    char private_group[MAX_GROUP_NAME]; 
    308     int ret = SP_connect (this->spread_name.c_str (), 
    309                           this->spread_private_name.c_str (), 0, 1, 
     315    int ret = SP_connect (this->replication_name.c_str (), 
     316                          this->replication_private_name.c_str (), 0, 1, 
    310317                          &this->spread_mailbox, private_group); 
    311318    if (ret < 0) 
     
    318325    } 
    319326 
    320     this->spread_private_group = string (private_group); 
    321     LOG4CXX_INFO (logger, "SpreadBackend: private_group=" + 
    322                   this->spread_private_group); 
    323  
    324     ret = SP_join (this->spread_mailbox, this->spread_group.c_str ()); 
     327    this->replication_private_group = string (private_group); 
     328    LOG4CXX_INFO (logger, "ReplicationBackend: private_group=" + 
     329                  this->replication_private_group); 
     330 
     331    ret = SP_join (this->spread_mailbox, this->replication_group.c_str ()); 
    325332    if (ret < 0) 
    326333    { 
     
    333340 
    334341    int fd; 
    335     fd = ::open ("last_uuid", 0x0, S_IRUSR | S_IWUSR| S_IRGRP | S_IROTH); 
     342    fd = ::open (this->replication_status_file.c_str (), 0x0,  
     343                 S_IRUSR | S_IWUSR| S_IRGRP | S_IROTH); 
    336344    listener_live = true; // we're live unless we load a last_uuid in a sec 
    337345    if (fd) 
    338346    { 
     347        LOG4CXX_DEBUG (logger, "ReplicationBackend: opened status file=" + 
     348                       this->replication_status_file); 
    339349        char buf[64] = ""; 
    340350        ::read (fd, buf, 64); 
    341         last_uuid = buf; 
     351        this->last_uuid = buf; 
    342352        ::close (fd); 
    343         if (!last_uuid.empty ()) 
     353        if (!this->last_uuid.empty ()) 
    344354        { 
    345355            listener_live = false; 
    346             request_next (last_uuid); 
    347             LOG4CXX_INFO (logger, "SpreadReplicationBackend: loaded last_uuid=" + 
    348                           last_uuid); 
     356            request_next (this->last_uuid); 
     357            LOG4CXX_INFO (logger, "ReplicationBackend: found last_uuid=" + 
     358                          this->last_uuid); 
    349359        } 
    350360    } 
     
    355365                       (void *)this) != 0) 
    356366    { 
    357         char error[] = "SpreadReplicationBackend: start_listener_thread failed\n"; 
     367        char error[] = "ReplicationBackend: start_listener_thread failed\n"; 
    358368        LOG4CXX_ERROR (logger, error); 
    359369        ThrudocException e; 
     
    363373} 
    364374 
    365 SpreadReplicationBackend::~SpreadReplicationBackend () 
    366 { 
    367     LOG4CXX_INFO (logger, "~SpreadReplicationBackend"); 
     375ReplicationBackend::~ReplicationBackend () 
     376{ 
     377    LOG4CXX_INFO (logger, "~ReplicationBackend"); 
    368378    // we're no longer live, don't accept connections 
    369379    this->listener_live = false; 
     
    400410// TODO: implement circuit breaker pattern around spread... 
    401411 
    402 void SpreadReplicationBackend::put (const string & bucket, const string & key, 
     412void ReplicationBackend::put (const string & bucket, const string & key, 
    403413                                    const string & value) 
    404414{ 
    405     char msg[SPREAD_BACKEND_MAX_MESSAGE_SIZE]; 
     415    char msg[REPLICATION_BACKEND_MAX_MESSAGE_SIZE]; 
    406416    string uuid = generate_uuid (); 
    407     snprintf (msg, SPREAD_BACKEND_MAX_MESSAGE_SIZE, "%s p %s %s %s", 
     417    snprintf (msg, REPLICATION_BACKEND_MAX_MESSAGE_SIZE, "%s p %s %s %s", 
    408418              uuid.c_str (), bucket.c_str (), key.c_str (), value.c_str ()); 
    409419 
     
    411421} 
    412422 
    413 void SpreadReplicationBackend::remove (const string & bucket, 
     423void ReplicationBackend::remove (const string & bucket, 
    414424                                       const string & key ) 
    415425{ 
    416     char msg[SPREAD_BACKEND_MAX_MESSAGE_SIZE]; 
     426    char msg[REPLICATION_BACKEND_MAX_MESSAGE_SIZE]; 
    417427    string uuid = generate_uuid (); 
    418     snprintf (msg, SPREAD_BACKEND_MAX_MESSAGE_SIZE, "%s r %s %s", 
     428    snprintf (msg, REPLICATION_BACKEND_MAX_MESSAGE_SIZE, "%s r %s %s", 
    419429              uuid.c_str (), bucket.c_str (), key.c_str ()); 
    420430 
     
    422432} 
    423433 
    424 string SpreadReplicationBackend::admin (const string & op, const string & data) 
     434string ReplicationBackend::admin (const string & op, const string & data) 
    425435{ 
    426436    if (op == "replay_from") 
     
    430440        return "done"; 
    431441    } 
    432     char msg[SPREAD_BACKEND_MAX_MESSAGE_SIZE]; 
     442    char msg[REPLICATION_BACKEND_MAX_MESSAGE_SIZE]; 
    433443    string uuid = generate_uuid (); 
    434     snprintf (msg, SPREAD_BACKEND_MAX_MESSAGE_SIZE, "%s a %s %s", 
     444    snprintf (msg, REPLICATION_BACKEND_MAX_MESSAGE_SIZE, "%s a %s %s", 
    435445              uuid.c_str (), op.c_str (), data.c_str ()); 
    436446 
     
    438448} 
    439449 
    440 string SpreadReplicationBackend::send_and_wait_for_resp (const char * msg, 
     450string ReplicationBackend::send_and_wait_for_resp (const char * msg, 
    441451                                                         string uuid) 
    442452{ 
    443453    LOG4CXX_DEBUG (logger, "wait_for_resp: begin uuid=" + uuid); 
    444454    string ret; 
    445     shared_ptr<SpreadReplicationWait> wait (new SpreadReplicationWait (uuid,  
     455    shared_ptr<ReplicationWait> wait (new ReplicationWait (uuid,  
    446456                                                                       2)); 
    447457    // install wait 
     
    451461    } 
    452462    // send out multi-cast message 
    453     SP_multicast (this->spread_mailbox, SAFE_MESS, this->spread_group.c_str (), 
     463    SP_multicast (this->spread_mailbox, SAFE_MESS,  
     464                  this->replication_group.c_str (), 
    454465                  ORIG_MESSAGE_TYPE, strlen (msg), msg); 
    455466    // wait here until we have the result 
     
    471482} 
    472483 
    473 void SpreadReplicationBackend::validate (const std::string & bucket, 
     484void ReplicationBackend::validate (const std::string & bucket, 
    474485                                         const std::string * key, 
    475486                                         const std::string * value) 
     
    504515} 
    505516 
    506 string SpreadReplicationBackend::generate_uuid () 
     517string ReplicationBackend::generate_uuid () 
    507518{ 
    508519    uuid_t uuid; 
     
    513524} 
    514525 
    515 void * SpreadReplicationBackend::start_listener_thread (void * ptr) 
     526void * ReplicationBackend::start_listener_thread (void * ptr) 
    516527{ 
    517528    LOG4CXX_INFO (logger, "start_listener_thread: "); 
    518     (((SpreadReplicationBackend*)ptr)->listener_thread_run ()); 
     529    (((ReplicationBackend*)ptr)->listener_thread_run ()); 
    519530    return NULL; 
    520531} 
    521532 
    522 void SpreadReplicationBackend::listener_thread_run () 
     533void ReplicationBackend::listener_thread_run () 
    523534{ 
    524535    struct pollfd fds[] = { {this->spread_mailbox, POLLIN, 0} }; 
     
    541552                this->listener_thread_go = false; 
    542553            } 
    543             if (last_flush + 30 < time (0)) 
    544             { 
    545                 LOG4CXX_DEBUG (logger, "flushing last_uuid=" + last_uuid); 
     554            if ((last_flush + this->replication_status_flush_frequency) <  
     555                time (0)) 
     556            { 
     557                LOG4CXX_DEBUG (logger, "listener_thread_run: flushing last_uuid=" + 
     558                               this->last_uuid); 
    546559                int fd; 
    547                 fd = ::open ("last_uuid", O_RDWR | O_TRUNC | O_CREAT,  
     560                fd = ::open (this->replication_status_file.c_str (), 
     561                             O_RDWR | O_TRUNC | O_CREAT,  
    548562                             S_IRUSR | S_IWUSR| S_IRGRP | S_IROTH); 
    549563                ::write (fd, this->last_uuid.c_str (),  
     
    564578} 
    565579 
    566 void SpreadReplicationBackend::handle_message () 
    567 { 
    568     SpreadReplicationMessage * message = new SpreadReplicationMessage (); 
     580void ReplicationBackend::handle_message () 
     581{ 
     582    ReplicationMessage * message = new ReplicationMessage (); 
    569583    message->receive (this->spread_mailbox); 
    570     int type = message->parse (this->spread_private_group); 
     584    int type = message->parse (this->replication_private_group); 
    571585    if (type == ORIG_MESSAGE_TYPE) 
    572586    { 
     
    583597            while (!pending_messages.empty ()) 
    584598            { 
    585                 SpreadReplicationMessage * drain = pending_messages.front (); 
     599                ReplicationMessage * drain = pending_messages.front (); 
    586600                pending_messages.pop (); 
    587601                LOG4CXX_DEBUG (logger, string ("handle_message: drain.uuid=") + 
     
    608622        { 
    609623            // it's a message 
    610             SpreadReplicationMessage * first_queued = pending_messages.front (); 
     624            ReplicationMessage * first_queued = pending_messages.front (); 
    611625            if (first_queued == NULL || 
    612626                first_queued->get_uuid () != message->get_uuid ()) 
     
    645659} 
    646660 
    647 void SpreadReplicationBackend::do_message (SpreadReplicationMessage * message) 
     661void ReplicationBackend::do_message (ReplicationMessage * message) 
    648662{ 
    649663    string ret; 
     
    696710 
    697711    // if we sent this message signal to the waiting thread that it's complete 
    698     if (this->spread_private_group == message->get_sender ()) 
     712    if (this->replication_private_group == message->get_sender ()) 
    699713    { 
    700714        RWGuard g (this->pending_waits_mutex, false); 
    701         std::map<std::string, boost::shared_ptr<SpreadReplicationWait> >::iterator 
     715        std::map<std::string, boost::shared_ptr<ReplicationWait> >::iterator 
    702716            i = pending_waits.find (message->get_uuid ()); 
    703717        if (i != pending_waits.end ()) 
     
    708722} 
    709723 
    710 void SpreadReplicationBackend::request_next (string uuid) 
     724void ReplicationBackend::request_next (string uuid) 
    711725{ 
    712726    // we don't want our own message back here... 
    713727    SP_multicast (this->spread_mailbox, RELIABLE_MESS | SELF_DISCARD, 
    714                   this->spread_group.c_str (), 
     728                  this->replication_group.c_str (), 
    715729                  REPLAY_MESSAGE_TYPE, uuid.length (), uuid.c_str ()); 
    716730} 
  • trunk/thrudoc/src/ReplicationBackend.h

    r280 r283  
    33 **/ 
    44 
    5 #ifndef __SPREAD_REPLICIATION_BACKEND_H_ 
    6 #define __SPREAD_REPLICIATION_BACKEND_H_ 
     5#ifndef _REPLICIATION_BACKEND_H_ 
     6#define _REPLICIATION_BACKEND_H_ 
    77 
    88#if HAVE_LIBMEMCACHED && HAVE_LIBUUID 
     
    1818#include "ThrudocPassthruBackend.h" 
    1919 
    20 class SpreadReplicationWait; 
    21 class SpreadReplicationMessage; 
     20class ReplicationWait; 
     21class ReplicationMessage; 
    2222 
    2323/* NOTE: this is not exactly a normal passthrough backend even tho it kinda 
    2424 * looks like one. we use the passthrough, but there's spread in between 
    2525 * passing things down */ 
    26 class SpreadReplicationBackend : public ThrudocPassthruBackend 
     26class ReplicationBackend : public ThrudocPassthruBackend 
    2727{ 
    2828    public: 
    29         SpreadReplicationBackend (boost::shared_ptr<ThrudocBackend> backend, 
    30                                   const std::string & spread_name,  
    31                                   const std::string & spread_private_name, 
    32                                   const std::string & spread_group); 
    33         ~SpreadReplicationBackend (); 
     29        ReplicationBackend (boost::shared_ptr<ThrudocBackend> backend, 
     30                            const std::string & replication_name,  
     31                            const std::string & replication_private_name, 
     32                            const std::string & replication_group, 
     33                            const std::string & replication_status_file, 
     34                            const int replication_status_flush_frequency); 
     35        ~ReplicationBackend (); 
    3436 
    3537        void put (const std::string & bucket, const std::string & key, 
     
    4850        static void * start_listener_thread (void * ptr); 
    4951 
    50         std::string spread_name; 
    51         std::string spread_private_name; 
    52         std::string spread_group; 
    53         std::string spread_private_group; 
     52        std::string replication_name; 
     53        std::string replication_private_name; 
     54        std::string replication_group; 
     55        std::string replication_private_group; 
     56        std::string replication_status_file; 
     57        int replication_status_flush_frequency; 
    5458        mailbox spread_mailbox; 
    5559        pthread_t listener_thread; 
     
    5761        bool listener_live; 
    5862        std::string last_uuid; 
    59         std::queue<SpreadReplicationMessage *> pending_messages; 
    60         std::map<std::string, boost::shared_ptr<SpreadReplicationWait> >  
     63        std::queue<ReplicationMessage *> pending_messages; 
     64        std::map<std::string, boost::shared_ptr<ReplicationWait> >  
    6165            pending_waits; 
    6266        facebook::thrift::concurrency::ReadWriteMutex pending_waits_mutex; 
     
    6569        void listener_thread_run (); 
    6670        void handle_message (); 
    67         void do_message (SpreadReplicationMessage * message); 
     71        void do_message (ReplicationMessage * message); 
    6872        void request_next (std::string message); 
    6973}; 
     
    7175#endif /* HAVE_LIBMEMCACHED */ 
    7276 
    73 #endif 
     77#endif /* _REPLICIATION_BACKEND_H_ */ 
  • trunk/thrudoc/src/app_helpers.cpp

    r230 r283  
    1818#include "S3Backend.h" 
    1919#include "SpreadBackend.h" 
     20#include "ReplicationBackend.h" 
    2021#include "StatsBackend.h" 
    2122#include "ThrudocBackend.h" 
     
    174175    string spread_private_name = 
    175176        ConfigManager->read<string>("SPREAD_PRIVATE_NAME", ""); 
     177    // Spread replication (sorta) passthrough 
     178    string replication_private_name = 
     179        ConfigManager->read<string>("REPLICATION_PRIVATE_NAME", ""); 
    176180#if HAVE_LIBSPREAD 
    177181    string spread_name = 
     
    184188            (new SpreadBackend (backend, spread_name, spread_private_name, 
    185189                                spread_group)); 
     190 
     191    string replication_name = 
     192        ConfigManager->read<string>("REPLICATION_NAME", "4803"); 
     193    string replication_group = 
     194        ConfigManager->read<string>("REPLICATION_GROUP", "thrudoc"); 
     195    string replication_status_file = 
     196        ConfigManager->read<string>("REPLICATION_STATUS_FILE",  
     197                                    "replication_status"); 
     198    int replication_status_flush_frequency = 
     199        ConfigManager->read<int>("REPLICATION_STATUS_FLUSH_FREQUENCY", 30); 
     200 
     201    if (!replication_private_name.empty ()) 
     202        backend = shared_ptr<ThrudocBackend> 
     203            (new ReplicationBackend (backend, replication_name,  
     204                                     replication_private_name, 
     205                                     replication_group,  
     206                                     replication_status_file, 
     207                                     replication_status_flush_frequency)); 
    186208#else 
    187209    if (!spread_private_name.empty ()) 
     
    189211        LOG4CXX_ERROR (logger, "SPREAD_PRIVATE_NAME supplied, but spread support not complied in"); 
    190212        fprintf (stderr, "SPREAD_PRIVATE_NAME supplied, but spread support not complied in\n"); 
     213        exit (1); 
     214    } 
     215    if (!replication_private_name.empty ()) 
     216    { 
     217        LOG4CXX_ERROR (logger, "REPLICATION_PRIVATE_NAME supplied, but spread support not complied in"); 
     218        fprintf (stderr, "REPLCATION_PRIVATE_NAME supplied, but spread support not complied in\n"); 
    191219        exit (1); 
    192220    }