Changeset 304

Show
Ignore:
Timestamp:
02/25/08 06:28:26 (9 months ago)
Author:
rm
Message:

more work on SpreadConnection?, getting it working. initial introduction of unit test infrastructure, cppunit.

Location:
trunk/thrucommon
Files:
4 added
5 modified

Legend:

Unmodified
Added
Removed
  • trunk/thrucommon/Makefile.am

    r182 r304  
    1 SUBDIRS = src 
     1SUBDIRS = src tests 
  • trunk/thrucommon/configure.in

    r251 r304  
    1313AC_PROG_LIBTOOL 
    1414AC_PROG_MAKE_SET 
     15 
     16# libcppunit, optional 
     17AC_ARG_WITH(tests, 
     18    AC_HELP_STRING([--with-tests], 
     19                   [build test suite support, requires cppunit (default is no]), 
     20    [with_memcached=$withval], 
     21    [with_memcached=no]) 
     22if test "x$with_tests" = "xyes"; then 
     23    AM_PATH_CPPUNIT(1.9.6) 
     24    if test "x$no_cppunit" != "xyes" ; then 
     25        AC_DEFINE([HAVE_CPPUNIT], [1], [Define to 1 if you have the cppunit library (-lcppunit)]) 
     26    fi 
     27fi 
    1528 
    1629AC_LANG(C++) 
     
    100113AC_CONFIG_FILES([Makefile 
    101114                 src/Makefile 
    102                  src/thrucommon.pc]) 
     115                 src/thrucommon.pc 
     116                 tests/Makefile]) 
    103117AC_OUTPUT 
  • trunk/thrucommon/src/Makefile.am

    r281 r304  
    1717                             ThruFileTransport.h        \ 
    1818                             RecoveryManager.h          \ 
     19                             SpreadConnection.h         \ 
    1920                             SpreadManager.h            \ 
    2021                             SpreadTaskFactory.h        \ 
     
    3536                           ThruFileTransport.cpp                \ 
    3637                           RecoveryManager.cpp                  \ 
     38                           SpreadConnection.cpp                 \ 
    3739                           SpreadManager.cpp                    \ 
    3840                           Transaction.cpp                      \ 
  • trunk/thrucommon/src/SpreadConnection.cpp

    r303 r304  
    22#include "SpreadConnection.h" 
    33 
     4using namespace log4cxx; 
    45using namespace std; 
    56 
    67string SP_error_to_string (int error); 
     8 
     9LoggerPtr SpreadConnection::logger (Logger::getLogger ("SpreadConnection")); 
    710 
    811SpreadConnection::SpreadConnection (const string & name, 
     
    3942SpreadConnection::~SpreadConnection () 
    4043{ 
     44    LOG4CXX_INFO (logger, "~SpreadConnection"); 
    4145    map<string, vector<string> >::iterator i; 
    4246    for (i = this->groups.begin (); 
     
    5155void SpreadConnection::join (const std::string & group) 
    5256{ 
     57    LOG4CXX_DEBUG (logger, "join: group=" + group); 
    5358    map<string, vector<string> >::iterator i; 
    5459    i = this->groups.find (group.c_str ()); 
    5560    if (i != this->groups.end ()) 
    5661    { 
    57         // TODO: throw an exception 
     62        LOG4CXX_DEBUG (logger, "    already a member"); 
    5863    } 
    5964    else 
     
    7075        // that's handled elsewhere, as are membership changes 
    7176        this->groups[group]; 
     77        LOG4CXX_DEBUG (logger, "    joined"); 
    7278    } 
    7379} 
     
    7581void SpreadConnection::leave (const std::string & group) 
    7682{ 
     83    LOG4CXX_DEBUG (logger, "leave: group=" + group); 
    7784    map<string, vector<string> >::iterator i; 
    7885    i = this->groups.find (group.c_str ()); 
     
    8087    { 
    8188        SP_leave (this->mbox, group.c_str ()); 
     89        // TODO: this will unsubscribe everyone... 
    8290        this->groups.erase (i); 
     91        LOG4CXX_DEBUG (logger, "    left"); 
    8392    } 
    8493} 
     
    8695void SpreadConnection::subscribe (const string & sender, const string & group, 
    8796                                  const int message_type, 
    88                                   subscriber_callback callback) 
    89 { 
     97                                  SubscriberCallbackInfo * callback) 
     98{ 
     99    char buf[256]; 
     100    sprintf (buf, "subscribe: sender=%s, group=%s, message_type=%d",  
     101             sender.c_str (), group.c_str (), message_type); 
     102    LOG4CXX_DEBUG (logger, buf); 
    90103    if (!group.empty ()) 
    91104    { 
     
    101114                              const char * message, const int message_len) 
    102115{ 
     116    if (logger->isDebugEnabled ()) 
     117    { 
     118        char buf[128]; 
     119        sprintf (buf, "queue: group=%s, message_type=%d", group.c_str (), 
     120                 message_type); 
     121        LOG4CXX_DEBUG (logger, buf); 
     122    } 
    103123    QueuedMessage * queued_message = 
    104124        (QueuedMessage*)malloc (sizeof (QueuedMessage)); 
    105     queued_message->group = group; 
     125    queued_message->group = strndup (group.c_str (), group.length ()); 
    106126    queued_message->message_type = message_type; 
    107127    queued_message->message = strndup (message, message_len); 
     
    112132void SpreadConnection::run (int count) 
    113133{ 
     134    if (logger->isDebugEnabled ()) 
     135    { 
     136        char buf[64]; 
     137        sprintf (buf, "run: count=%d", count); 
     138        LOG4CXX_DEBUG (logger, buf); 
     139    } 
    114140    // send out any pending message 
    115141    this->drain_pending (); 
     
    129155    char buf[buf_size]; 
    130156 
    131     int i; 
     157    int i = 0; 
    132158    while (!count || i < count) 
    133159    { 
     160        LOG4CXX_DEBUG (logger, "run:    receiving"); 
    134161        buf_len = SP_receive (this->mbox, &service_type, sender, 
    135162                              max_groups, &num_groups, groups, &type, 
    136163                              &endian_mismatch, buf_size, buf); 
     164        // TODO: tmp 
     165        { 
     166            char buf[128]; 
     167            sprintf (buf, "recv: buf_len=%d, sender=%s, group=%s, type=%d", 
     168                     buf_len, sender, groups[0], type); 
     169            LOG4CXX_DEBUG (logger, buf); 
     170        } 
    137171        if (buf_len > 0) 
    138172        { 
     
    173207        // drain any pending messages 
    174208        this->drain_pending (); 
    175     } 
    176 } 
    177  
    178 void SpreadConnection::make_callbacks (vector<subscriber_callback> callbacks, 
    179                                        const string & sender,  
    180                                        const string & group, 
    181                                        const int message_type,  
    182                                        const char * message, 
    183                                        const int message_len) 
    184 { 
     209 
     210        i++; 
     211    } 
     212    LOG4CXX_DEBUG (logger, "run:    done"); 
     213} 
     214 
     215void SpreadConnection::make_callbacks 
     216(vector<SubscriberCallbackInfo *> callbacks, const string & sender, 
     217 const string & group, const int message_type, const char * message, 
     218 const int message_len) 
     219{ 
     220    if (logger->isDebugEnabled ()) 
     221    { 
     222        char buf[256]; 
     223        sprintf (buf, "make_callbacks: callbacks.size=%d, sender=%s, group=%s, message_type=%d", 
     224                 (int)callbacks.size (), sender.c_str (), group.c_str (),  
     225                 message_type); 
     226        LOG4CXX_DEBUG (logger, buf); 
     227    } 
    185228    bool ret; 
    186     vector<subscriber_callback>::iterator i; 
     229    vector<SubscriberCallbackInfo *>::iterator i; 
    187230    for (i = callbacks.begin (); i != callbacks.end (); i++) 
    188231    { 
    189         ret = (*i)(this, sender, group, message_type, message, message_len); 
     232        subscriber_callback call = (*i)->callback; 
     233        void * data = (*i)->data; 
     234        ret = (call)(this, sender, group, message_type, message,  
     235                     message_len, data); 
     236        ret = ((*i)->callback)(this, sender, group, message_type, message,  
     237                               message_len, (*i)->data); 
    190238        if (!ret) 
    191239            callbacks.erase (i); 
     
    197245                                 const int message_len) 
    198246{ 
     247    if (logger->isDebugEnabled ()) 
     248    { 
     249        char buf[128]; 
     250        sprintf (buf, "dispatch: sender=%s, group=%s, message_type=%d", 
     251                 sender.c_str (), group.c_str (), message_type); 
     252        LOG4CXX_DEBUG (logger, buf); 
     253    } 
     254 
    199255    // if we haven't installed any callbacks, there's no reason to continue 
    200256    if (this->subscriptions.empty ()) 
     
    202258 
    203259    // TODO: recieve self flag 
    204     if (this->private_group == sender) 
     260    if (this->private_group == sender && !this->receive_self) 
    205261        return; 
    206262 
     
    208264    // C++ is compared to perl where this is 8 lines of code :( 
    209265 
    210     map<string, map<string, map<int, vector<subscriber_callback> > > >::iterator 
    211         s = this->subscriptions.find (sender); 
     266    map<string, map<string, map<int,  
     267        vector<SubscriberCallbackInfo *> > > >::iterator s =  
     268            this->subscriptions.find (sender); 
    212269    if (s != this->subscriptions.end ()) 
    213270    { 
    214         map<string, map<int, vector<subscriber_callback> > >::iterator s_g = 
    215             (*s).second.find (group); 
     271        map<string, map<int,  
     272            vector<SubscriberCallbackInfo *> > >::iterator s_g = 
     273                (*s).second.find (group); 
    216274        if (s_g != (*s).second.end ()) 
    217275        { 
    218             map<int, vector<subscriber_callback> >::iterator s_g_t = 
     276            map<int, vector<SubscriberCallbackInfo *> >::iterator s_g_t = 
    219277                (*s_g).second.find (message_type); 
    220278            if (s_g_t != (*s_g).second.end ()) 
    221279            { 
    222280                // we have a sender, group, type match 
    223                 this->make_callbacks ((*s_g_t).second, sender, group,  
    224                                       message_type, message, message_len); 
    225             } 
    226             map<int, vector<subscriber_callback> >::iterator s_g_et =  
     281                this->make_callbacks ((*s_g_t).second, sender, group, 
     282                                      message_type, message, message_len); 
     283            } 
     284            map<int, vector<SubscriberCallbackInfo *> >::iterator s_g_et = 
    227285                (*s_g).second.find (-1); 
    228286            if (s_g_et != (*s_g).second.end ()) 
    229287            { 
    230288                // we have a sender, group, empty type match 
    231                 this->make_callbacks ((*s_g_et).second, sender, group,  
    232                                       message_type, message, message_len); 
    233             } 
    234         } 
    235         map<string, map<int, vector<subscriber_callback> > >::iterator s_eg = 
    236             (*s).second.find (""); 
     289                this->make_callbacks ((*s_g_et).second, sender, group, 
     290                                      message_type, message, message_len); 
     291            } 
     292        } 
     293        map<string, map<int,  
     294            vector<SubscriberCallbackInfo *> > >::iterator s_eg = 
     295                (*s).second.find (""); 
    237296        if (s_eg != (*s).second.end ()) 
    238297        { 
    239             map<int, vector<subscriber_callback> >::iterator s_eg_t =  
     298            map<int, vector<SubscriberCallbackInfo *> >::iterator s_eg_t = 
    240299                (*s_eg).second.find (message_type); 
    241300            if (s_eg_t != (*s_eg).second.end ()) 
    242301            { 
    243302                // we have a sender, empty group, type match 
    244                 this->make_callbacks ((*s_eg_t).second, sender, group,  
    245                                       message_type, message, message_len); 
    246             } 
    247             map<int, vector<subscriber_callback> >::iterator s_eg_et =  
     303                this->make_callbacks ((*s_eg_t).second, sender, group, 
     304                                      message_type, message, message_len); 
     305            } 
     306            map<int, vector<SubscriberCallbackInfo *> >::iterator s_eg_et = 
    248307                (*s_eg).second.find (-1); 
    249308            if (s_eg_et != (*s_eg).second.end ()) 
    250309            { 
    251310                // we have a sender, empty group, empty type match 
    252                 this->make_callbacks ((*s_eg_et).second, sender, group,  
    253                                       message_type, message, message_len); 
    254             } 
    255         } 
    256     } 
    257     map<string, map<string, map<int, vector<subscriber_callback> > > >::iterator 
    258         es = this->subscriptions.find (""); 
     311                this->make_callbacks ((*s_eg_et).second, sender, group, 
     312                                      message_type, message, message_len); 
     313            } 
     314        } 
     315    } 
     316    map<string, map<string, map<int,  
     317        vector<SubscriberCallbackInfo *> > > >::iterator 
     318            es = this->subscriptions.find (""); 
    259319    if (es != this->subscriptions.end ()) 
    260320    { 
    261         map<string, map<int, vector<subscriber_callback> > >::iterator es_g = 
    262             (*es).second.find (group); 
     321        map<string, map<int,  
     322            vector<SubscriberCallbackInfo *> > >::iterator es_g = 
     323                (*es).second.find (group); 
    263324        if (es_g != (*es).second.end ()) 
    264325        { 
    265             map<int, vector<subscriber_callback> >::iterator es_g_t =  
     326            map<int, vector<SubscriberCallbackInfo *> >::iterator es_g_t = 
    266327                (*es_g).second.find (message_type); 
    267328            if (es_g_t != (*es_g).second.end ()) 
    268329            { 
    269330                // we have a empty sender, group, type match 
    270                 this->make_callbacks ((*es_g_t).second, sender, group,  
    271                                       message_type, message, message_len); 
    272             } 
    273             map<int, vector<subscriber_callback> >::iterator es_g_et =  
     331                this->make_callbacks ((*es_g_t).second, sender, group, 
     332                                      message_type, message, message_len); 
     333            } 
     334            map<int, vector<SubscriberCallbackInfo *> >::iterator es_g_et = 
    274335                (*es_g).second.find (-1); 
    275336            if (es_g_et != (*es_g).second.end ()) 
    276337            { 
    277338                // we have a empty sender, group, empty type match 
    278                 this->make_callbacks ((*es_g_et).second, sender, group,  
    279                                       message_type, message, message_len); 
    280             } 
    281         } 
    282         map<string, map<int, vector<subscriber_callback> > >::iterator es_eg = 
     339                this->make_callbacks ((*es_g_et).second, sender, group, 
     340                                      message_type, message, message_len); 
     341            } 
     342        } 
     343        map<string, map<int,  
     344            vector<SubscriberCallbackInfo *> > >::iterator es_eg = 
    283345            (*es).second.find (""); 
    284346        if (es_eg != (*es).second.end ()) 
    285347        { 
    286             map<int, vector<subscriber_callback> >::iterator es_eg_t =  
     348            map<int, vector<SubscriberCallbackInfo *> >::iterator es_eg_t = 
    287349                (*es_eg).second.find (message_type); 
    288350            if (es_eg_t != (*es_eg).second.end ()) 
    289351            { 
    290352                // we have a empty sender, empty group, type match 
    291                 this->make_callbacks ((*es_eg_t).second, sender, group,  
    292                                       message_type, message, message_len); 
    293             } 
    294             map<int, vector<subscriber_callback> >::iterator es_eg_et =  
     353                this->make_callbacks ((*es_eg_t).second, sender, group, 
     354                                      message_type, message, message_len); 
     355            } 
     356            map<int, vector<SubscriberCallbackInfo *> >::iterator es_eg_et = 
    295357                (*es_eg).second.find (-1); 
    296358            if (es_eg_et != (*es_eg).second.end ()) 
    297359            { 
    298360                // we have a empty sender, empty group, empty type match 
    299                 this->make_callbacks ((*es_eg_et).second, sender, group,  
     361                this->make_callbacks ((*es_eg_et).second, sender, group, 
    300362                                      message_type, message, message_len); 
    301363            } 
     
    306368void SpreadConnection::drain_pending () 
    307369{ 
     370    if (logger->isDebugEnabled ()) 
     371    { 
     372        char buf[64]; 
     373        sprintf (buf, "drain_pending: pending_messages.size=%d",  
     374                 (int)this->pending_messages.size ()); 
     375        LOG4CXX_DEBUG (logger, buf); 
     376    } 
    308377    int ret; 
    309378    QueuedMessage * qm; 
    310379    while (!this->pending_messages.empty ()) 
    311380    { 
     381        LOG4CXX_DEBUG (logger, "drain_pending:    draining"); 
    312382        qm = this->pending_messages.front (); 
    313383        this->pending_messages.pop (); 
    314         ret = SP_multicast (this->mbox, SAFE_MESS | SELF_DISCARD,  
    315                             qm->group.c_str (), qm->message_type,  
     384        ret = SP_multicast (this->mbox, SAFE_MESS | SELF_DISCARD, 
     385                            qm->group, qm->message_type, 
    316386                            qm->message_len, qm->message); 
    317387        // TODO: error handling 
     388        free (qm->group); 
    318389        free (qm->message); 
    319390        free (qm); 
    320391    } 
     392    LOG4CXX_DEBUG (logger, "drain_pending: done"); 
    321393} 
    322394 
  • trunk/thrucommon/src/SpreadConnection.h

    r303 r304  
    1717/* callbacks return true to stay installed, false to be removed */ 
    1818typedef bool (*subscriber_callback) (SpreadConnection * spread_connection, 
    19                                      const std::string & sender,  
    20                                      const std::string & group,  
    21                                      const int message_type,  
    22                                      const char * message,  
    23                                      const int message_len); 
     19                                     const std::string & sender, 
     20                                     const std::string & group, 
     21                                     const int message_type, 
     22                                     const char * message, 
     23                                     const int message_len, 
     24                                     void * data); 
     25typedef struct SubscriberCallbackInfo SubscriberCallbackInfo; 
     26struct SubscriberCallbackInfo 
     27{ 
     28    subscriber_callback callback; 
     29    void * data; 
     30}; 
    2431 
    2532struct QueuedMessage 
    2633{ 
    27     std::string group; 
     34    char * group; 
    2835    int message_type; 
    2936    char * message; 
     
    3441{ 
    3542    public: 
    36         SpreadConnection (const std::string & name,  
     43        SpreadConnection (const std::string & name, 
    3744                          const std::string & private_name); 
    3845        ~SpreadConnection (); 
    3946 
    4047        void subscribe (const std::string & sender, const std::string & group, 
    41                         const int message_type, subscriber_callback callback); 
     48                        const int message_type, 
     49                        SubscriberCallbackInfo * callback); 
    4250        void queue (const std::string & group, const int message_type, 
    4351                    const char * message, const int message_len); 
    4452        void run (int count); 
     53 
     54        void set_receive_self (bool receive_self) 
     55        { 
     56            this->receive_self = receive_self; 
     57        } 
     58 
     59        std::string get_private_group () 
     60        { 
     61            return this->private_group; 
     62        } 
    4563 
    4664    private: 
     
    5169        std::string group; 
    5270        std::string private_group; 
     71        bool receive_self; 
    5372        mailbox mbox; 
    5473        std::map<std::string, std::vector<std::string> > groups; 
    55         std::map<std::string, std::map<std::string,  
    56             std::map<int, std::vector<subscriber_callback> > > > subscriptions; 
     74        std::map<std::string, std::map<std::string, std::map<int, 
     75            std::vector<SubscriberCallbackInfo *> > > > subscriptions; 
    5776        std::queue<QueuedMessage *> pending_messages; 
    5877 
     
    6281                       const int message_type, const char * message, 
    6382                       const int message_len); 
    64         void make_callbacks (std::vector<subscriber_callback> callbacks, 
    65                              const std::string & sender,  
     83        void make_callbacks (std::vector<SubscriberCallbackInfo *> callbacks, 
     84                             const std::string & sender, 
    6685                             const std::string & group, 
    6786                             const int message_type, const char * message,