Changeset 332

Show
Ignore:
Timestamp:
03/13/08 01:10:04 (8 months ago)
Author:
jake
Message:

Incremental update, not functional yet...

Location:
trunk/throxy/src
Files:
8 modified

Legend:

Unmodified
Added
Removed
  • trunk/throxy/src/ServiceMonitor.cpp

    r285 r332  
    1515 
    1616ServiceNode::ServiceNode(const string name, const string host, const int port, const bool master) 
    17     : name(name), host(host), port(port), master(master), active(false), last_ping(-1) 
     17    : name(name), master(master), host(host), port(port), active(false), last_ping(-1) 
    1818{ 
    1919 
     
    2626 
    2727ServiceNode::ServiceNode() 
     28    : master(false) 
    2829{ 
    2930 
     
    4041 
    4142        if( !socket->isOpen() || active == false ) 
    42             socket->open(); 
     43            transport->open(); 
     44        active = true; 
     45        return true; 
    4346 
    4447        //Call ping() this doesn't need to exist 
    4548        protocol->writeMessageBegin("ping", facebook::thrift::protocol::T_CALL, 0); 
     49 
     50        //protocol->writeStructBegin("Thrudoc_ping_args"); 
     51        //protocol->writeFieldStop(); 
     52        //protocol->writeStructEnd(); 
     53 
    4654        protocol->writeMessageEnd(); 
    4755        protocol->getTransport()->flush(); 
    4856        protocol->getTransport()->writeEnd(); 
    4957 
     58 
     59 
     60        LOG4CXX_INFO(logger,"Connected to remote host"); 
    5061        int32_t rseqid = 0; 
    5162        std::string fname; 
     
    5869        protocol->getTransport()->readEnd(); 
    5970 
     71 
     72 
    6073        if(mtype == facebook::thrift::protocol::T_EXCEPTION || 
    6174           mtype == facebook::thrift::protocol::T_REPLY ){ 
     
    101114 
    102115 
     116shared_ptr<ServiceNode> ServiceNode::clone() 
     117{ 
     118    return shared_ptr<ServiceNode>( new ServiceNode(name,host,port,master) ); 
     119} 
     120 
    103121//////////////////////////////////////////////////////////////////////////// 
    104122 
    105123LoggerPtr ServicePartition::logger (Logger::getLogger ("ServicePartition")); 
    106124 
    107 ServicePartition::ServicePartition(const std::string name) 
    108     : name(name) 
     125ServicePartition::ServicePartition(const std::string name, unsigned int pool_size) 
     126    : name(name), pool_size(pool_size) 
    109127{ 
    110128 
     
    120138    vector<string> names; 
    121139 
    122     map<string, shared_ptr<ServiceNode> >::iterator it; 
     140    map<string, vector<shared_ptr<ServiceNode> > >::iterator it; 
    123141 
    124142    for(it = service_nodes.begin(); it != service_nodes.end(); ++it){ 
     
    131149void ServicePartition::addServiceNode( shared_ptr<ServiceNode> node ) 
    132150{ 
    133     service_nodes[node->name] = node; 
     151 
     152    vector< shared_ptr<ServiceNode> > nodes; 
     153 
     154    nodes.push_back( node ); 
     155 
     156    for(unsigned int i=0; i<(pool_size-1); i++){ 
     157        nodes.push_back( node->clone() ); 
     158    } 
     159 
     160    service_nodes[node->name] = nodes; 
    134161} 
    135162 
     
    141168shared_ptr<ServiceNode> ServicePartition::getServiceNode( std::string _name ) 
    142169{ 
    143     map<string, shared_ptr<ServiceNode> >::iterator it = service_nodes.find( _name ); 
     170    map<string, vector<shared_ptr<ServiceNode> > >::iterator it = service_nodes.find( _name ); 
    144171 
    145172    if(it == service_nodes.end()) 
    146173        throw TException("Unknown node "+_name); 
    147174 
    148     return it->second; 
    149 } 
     175    //return a random connection to this node 
     176    return it->second[ (rand() * 10000 ) % pool_size]; 
     177} 
     178 
     179shared_ptr<TProtocol> ServicePartition::getConnection( bool master ) 
     180{ 
     181 
     182    map<string, vector< shared_ptr<ServiceNode> > >::iterator it; 
     183 
     184    for(it=service_nodes.begin(); it!=service_nodes.end(); ++it){ 
     185 
     186        if( (master && it->second[0]->master) || !master ) 
     187            return it->second[(rand() * 10000 ) % pool_size]->getConnection(); 
     188 
     189    } 
     190 
     191    throw TException("No connections available "); 
     192} 
     193 
     194 
    150195 
    151196//////////////////////////////////////////////////////////////////////////// 
     
    185230void ServiceMonitor::delServicePartition( string name ) 
    186231{ 
    187  
    188232    service_partitions.erase(name); 
    189  
    190233} 
    191234 
     
    201244    return it->second; 
    202245} 
     246 
     247 
     248 
     249vector< shared_ptr<TProtocol> > ServiceMonitor::getConnections( bool masters ) 
     250{ 
     251 
     252    vector< shared_ptr<TProtocol> > nodes; 
     253 
     254    map<string, shared_ptr<ServicePartition> >::iterator it; 
     255 
     256    for(it=service_partitions.begin(); it!=service_partitions.end(); ++it){ 
     257 
     258        nodes.push_back( it->second->getConnection( masters ) ); 
     259    } 
     260 
     261    return nodes; 
     262} 
  • trunk/throxy/src/ServiceMonitor.h

    r285 r332  
    1717    bool    ping(); 
    1818    boost::shared_ptr<facebook::thrift::protocol::TProtocol> getConnection(); 
     19        boost::shared_ptr<ServiceNode>                       clone(); // makin copies 
    1920 
    2021    const std::string name; 
     22    const bool        master; 
     23 
    2124 protected: 
    2225    ServiceNode(); 
     
    2528 
    2629    int         port; 
    27     bool        master; 
    28  
    2930    bool        active; 
    3031    uint64_t    last_ping; 
     
    4243{ 
    4344 public: 
    44     ServicePartition(const std::string name); 
     45    ServicePartition(const std::string name, unsigned int pool_size = 10 ); 
    4546    ~ServicePartition(); 
    4647 
     
    5152    boost::shared_ptr<ServiceNode> getServiceNode( std::string name ); 
    5253 
     54    boost::shared_ptr<facebook::thrift::protocol::TProtocol>  getConnection(bool master); 
     55 
    5356    const std::string name; 
    5457 
     
    5659    ServicePartition(); 
    5760 
    58     std::map< std::string, boost::shared_ptr<ServiceNode> > service_nodes; 
     61    const unsigned int pool_size; 
    5962 
    60  
     63    std::map< std::string, std::vector< boost::shared_ptr<ServiceNode> > > service_nodes; 
    6164 
    6265    static log4cxx::LoggerPtr logger; 
     
    7679    boost::shared_ptr<ServicePartition> getServicePartition( std::string name ); 
    7780 
     81    std::vector< boost::shared_ptr<facebook::thrift::protocol::TProtocol> > getConnections( bool masters = false ); 
     82 
    7883 protected: 
    7984    std::map< std::string, boost::shared_ptr<ServicePartition> > service_partitions; 
  • trunk/throxy/src/StaticServiceMonitor.cpp

    r285 r332  
    4848                break; 
    4949 
    50  
    5150            LOG4CXX_INFO(logger, string("Partition node added: ")+n_name); 
    5251 
     
    6362        this->addServicePartition(part); 
    6463    } 
     64 
     65    cerr<<"Done"<<endl; 
    6566} 
    6667 
    6768 
     69StaticServiceMonitor::~StaticServiceMonitor() 
     70{ 
    6871 
     72} 
     73 
     74 
  • trunk/throxy/src/ThrudocHandler.cpp

    r285 r332  
    1616using namespace log4cxx; 
    1717using namespace std; 
     18using namespace facebook::thrift::protocol; 
    1819 
    1920LoggerPtr ThrudocHandler::logger (Logger::getLogger ("ThrudocHandler")); 
     
    2122ThrudocHandler::ThrudocHandler () 
    2223{ 
     24    //never invoked (private) 
     25} 
    2326 
     27ThrudocHandler::ThrudocHandler(shared_ptr<ServiceMonitor> svc_mon) 
     28    : monitor(svc_mon) 
     29{ 
    2430 
    2531} 
     
    2733void ThrudocHandler::getBuckets (vector<string> &_return) 
    2834{ 
     35    vector< shared_ptr<TProtocol> >  protocols = monitor->getConnections( true ); 
    2936 
     37    shared_ptr<MyThrudocClient>  client( new MyThrudocClient() ); 
     38 
     39    //Send data to each partition 
     40    for(size_t i=0; i<protocols.size(); i++){ 
     41        client->setProtocol(protocols[i]); 
     42 
     43        client->send_getBuckets(); 
     44    } 
     45 
     46    map<string, int> all_buckets; 
     47 
     48    //Recv data from each partition 
     49    for(size_t i=0; i<protocols.size(); i++){ 
     50        client->setProtocol(protocols[i]); 
     51 
     52        vector<string> buckets; 
     53        client->recv_getBuckets(buckets); 
     54 
     55        //Count buckets from each partition 
     56        for(size_t j=0; j<buckets.size(); j++){ 
     57            map<string, int>::iterator it = all_buckets.find( buckets[j] ); 
     58 
     59            if( it == all_buckets.end() ){ 
     60                all_buckets[ buckets[j] ] = 1; 
     61            }else{ 
     62                all_buckets[ buckets[j] ]++; 
     63            } 
     64        } 
     65    } 
     66 
     67    // 
     68    map<string, int>::iterator it; 
     69 
     70    for(it = all_buckets.begin(); it != all_buckets.end(); ++it){ 
     71 
     72        if(all_buckets.size() != protocols.size()){ 
     73            LOG4CXX_ERROR(logger,"Parition missing bucket"+it->first); 
     74        } 
     75 
     76        _return.push_back(it->first); 
     77    } 
    3078 
    3179} 
  • trunk/throxy/src/ThrudocHandler.h

    r285 r332  
    1111#include <log4cxx/logger.h> 
    1212 
     13#include "ServiceMonitor.h" 
     14 
    1315class ThrudocHandler : virtual public thrudoc::ThrudocIf { 
    14     public: 
    15         ThrudocHandler (); 
     16 public: 
     17    ThrudocHandler (boost::shared_ptr<ServiceMonitor> svc_mon); 
    1618 
    17         void getBuckets (std::vector<std::string> &_return); 
     19    void getBuckets (std::vector<std::string> &_return); 
    1820 
    19         void put (const std::string &bucket, const std::string &key, 
    20                   const std::string &value); 
     21    void put (const std::string &bucket, const std::string &key, 
     22              const std::string &value); 
    2123 
    22         void putValue (std::string &_return, const std::string &bucket, 
    23                        const std::string &value); 
     24    void putValue (std::string &_return, const std::string &bucket, 
     25                   const std::string &value); 
    2426 
    25         void get (std::string &_return, const std::string &bucket, 
    26                   const std::string &key); 
     27    void get (std::string &_return, const std::string &bucket, 
     28              const std::string &key); 
    2729 
    28         void remove (const std::string &bucket, const std::string &key); 
     30    void remove (const std::string &bucket, const std::string &key); 
    2931 
    30         void scan (thrudoc::ScanResponse &_return, 
    31                    const std::string &bucket, 
    32                    const std::string &seed, int32_t count); 
     32    void scan (thrudoc::ScanResponse &_return, 
     33               const std::string &bucket, 
     34               const std::string &seed, int32_t count); 
    3335 
    34         void putList(std::vector<thrudoc::ThrudocException> &_return, 
    35                      const std::vector<thrudoc::Element> &elements); 
     36    void putList(std::vector<thrudoc::ThrudocException> &_return, 
     37                 const std::vector<thrudoc::Element> &elements); 
    3638 
    37         void getList(std::vector<thrudoc::ListResponse> &_return, 
    38                      const std::vector<thrudoc::Element> &elements); 
     39    void getList(std::vector<thrudoc::ListResponse> &_return, 
     40                 const std::vector<thrudoc::Element> &elements); 
    3941 
    40         void removeList(std::vector<thrudoc::ThrudocException> &_return, 
    41                         const std::vector<thrudoc::Element> &elements); 
     42    void removeList(std::vector<thrudoc::ThrudocException> &_return, 
     43                    const std::vector<thrudoc::Element> &elements); 
    4244 
    43         void putValueList(std::vector<thrudoc::ListResponse> &_return, 
    44                           const std::vector<thrudoc::Element> &elements); 
     45    void putValueList(std::vector<thrudoc::ListResponse> &_return, 
     46                      const std::vector<thrudoc::Element> &elements); 
    4547 
    46         void admin (std::string &_return, const std::string &op, 
    47                     const std::string &data); 
     48    void admin (std::string &_return, const std::string &op, 
     49                const std::string &data); 
    4850 
    49     private: 
    50         static log4cxx::LoggerPtr logger; 
     51 private: 
     52    ThrudocHandler(); 
     53 
     54    static log4cxx::LoggerPtr         logger; 
     55 
     56    boost::shared_ptr<ServiceMonitor> monitor; 
     57 
     58 
     59}; 
     60 
     61 
     62class MyThrudocClient : public thrudoc::ThrudocClient 
     63{ 
     64 public: 
     65 MyThrudocClient(): 
     66    ThrudocClient(boost::shared_ptr<facebook::thrift::protocol::TProtocol>()){}; 
     67 
     68    //Let me reuse this client for each connection 
     69    void setProtocol(boost::shared_ptr<facebook::thrift::protocol::TProtocol> p){ 
     70        this->piprot_ = p; 
     71        poprot_ = p; 
     72        iprot_ = p.get(); 
     73        oprot_ = p.get(); 
     74    } 
     75 
    5176}; 
    5277 
  • trunk/throxy/src/main.cpp

    r285 r332  
    1 #include <concurrency/ThreadManager.h> 
    2 #include <concurrency/PosixThreadFactory.h> 
    3 #include <protocol/TBinaryProtocol.h> 
    4 #include <server/TNonblockingServer.h> 
    5 #include <transport/TServerSocket.h> 
    6 #include <transport/TTransportUtils.h> 
     1#include <thrift/TProcessor.h> 
     2#include <thrift/concurrency/ThreadManager.h> 
     3#include <thrift/concurrency/PosixThreadFactory.h> 
     4#include <thrift/protocol/TBinaryProtocol.h> 
     5#include <thrift/server/TNonblockingServer.h> 
     6#include <thrift/transport/TServerSocket.h> 
     7#include <thrift/transport/TTransportUtils.h> 
    78 
    89 
     
    1617#include "utils.h" 
    1718 
    18 #include "Thrudoc.h" 
     19#include "ThrudocHandler.h" 
     20#include "StaticServiceMonitor.h" 
    1921#include "Thrudex.h" 
    2022 
     
    9092        shared_ptr<PosixThreadFactory> thread_factory(new PosixThreadFactory()); 
    9193        shared_ptr<TProtocolFactory> protocol_factory(new TBinaryProtocolFactory()); 
     94        thread_factory->setDetached(false); 
    9295 
    9396        //collection of server to throxy 
     
    9699 
    97100        //Thrudoc enabled 
    98         if( ConfigManager->read<int>("THRUDOC_PORT",-1) == -1 ) { 
     101        if( ConfigManager->read<int>("THRUDOC_PORT",-1) != -1 ) { 
    99102 
    100103            int    thrudoc_port  = ConfigManager->read<int>("THRUDOC_PORT"); 
    101             int    thread_count  = 30; 
     104            int    thread_count  = 3; 
    102105 
    103             shared_ptr<ThrudocHandler>  handler         getThrudocHandler(); 
    104             shared_ptr<TProcessor>      processor        (new ThrudocProcessor(handler)); 
     106            shared_ptr<ThrudocHandler>  handler = getThrudocHandler(); 
     107            shared_ptr<TProcessor>      processor(new thrudoc::ThrudocProcessor(handler)); 
    105108 
    106109            shared_ptr<ThreadManager> thread_manager = 
     
    114117 
    115118 
     119 
    116120            shared_ptr<Thread> t = 
    117121                thread_factory->newThread(shared_ptr<TServer>(new TNonblockingServer(processor, protocol_factory, 
    118122                                                                                     thrudoc_port,thread_manager))); 
    119123 
    120             server_thread.push_back(t); 
     124            server_threads.push_back(t); 
    121125        } 
    122126 
     
    139143 
    140144        //Start servers 
    141         for(int i=0; i<server_threads.size(); i++){ 
     145        for(size_t i=0; i<server_threads.size(); i++){ 
    142146            server_threads[i]->start(); 
    143147        } 
    144148 
    145149        //wait for server threads 
    146         for(int i=0; i<server_threads.size(); i++){ 
     150        for(size_t i=0; i<server_threads.size(); i++){ 
    147151            server_threads[i]->join(); 
    148152        } 
    149  
    150153 
    151154 
  • trunk/throxy/src/static.conf

    r285 r332  
    1313PARTITION_2 = p2 
    1414 
    15 PARTITION_2_NODE_1_NAME   = node1 
     15PARTITION_2_NODE_1_NAME   = node2 
    1616PARTITION_2_NODE_1_HOST   = 127.0.0.1 
    1717PARTITION_2_NODE_1_PORT   = 1978 
  • trunk/throxy/src/throxy.conf

    r285 r332  
    44THRUDOC_PORT         = 1998 
    55THRUDOC_MONITOR      = STATIC 
    6 THRUDOC_MONITOR_ARG1 = thrudoc.conf 
     6THRUDOC_MONITOR_ARG1 = static.conf 
     7 
     8 
     9# Set root logger level to DEBUG and its only appender to A1. 
     10log4j.rootLogger=DEBUG, A1 
     11 
     12# A1 is set to be a ConsoleAppender. 
     13log4j.appender.A1=org.apache.log4j.ConsoleAppender 
     14 
     15# A1 uses PatternLayout. 
     16log4j.appender.A1.layout=org.apache.log4j.PatternLayout 
     17log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n 
    718 
    819 
    920 
    10  
    11