Changeset 332
- Timestamp:
- 03/13/08 01:10:04 (8 months ago)
- Location:
- trunk/throxy/src
- Files:
-
- 8 modified
-
ServiceMonitor.cpp (modified) (10 diffs)
-
ServiceMonitor.h (modified) (6 diffs)
-
StaticServiceMonitor.cpp (modified) (2 diffs)
-
ThrudocHandler.cpp (modified) (3 diffs)
-
ThrudocHandler.h (modified) (1 diff)
-
main.cpp (modified) (6 diffs)
-
static.conf (modified) (1 diff)
-
throxy.conf (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
-
trunk/throxy/src/ServiceMonitor.cpp
r285 r332 15 15 16 16 ServiceNode::ServiceNode(const string name, const string host, const int port, const bool master) 17 : name(name), host(host), port(port), master(master), active(false), last_ping(-1)17 : name(name), master(master), host(host), port(port), active(false), last_ping(-1) 18 18 { 19 19 … … 26 26 27 27 ServiceNode::ServiceNode() 28 : master(false) 28 29 { 29 30 … … 40 41 41 42 if( !socket->isOpen() || active == false ) 42 socket->open(); 43 transport->open(); 44 active = true; 45 return true; 43 46 44 47 //Call ping() this doesn't need to exist 45 48 protocol->writeMessageBegin("ping", facebook::thrift::protocol::T_CALL, 0); 49 50 //protocol->writeStructBegin("Thrudoc_ping_args"); 51 //protocol->writeFieldStop(); 52 //protocol->writeStructEnd(); 53 46 54 protocol->writeMessageEnd(); 47 55 protocol->getTransport()->flush(); 48 56 protocol->getTransport()->writeEnd(); 49 57 58 59 60 LOG4CXX_INFO(logger,"Connected to remote host"); 50 61 int32_t rseqid = 0; 51 62 std::string fname; … … 58 69 protocol->getTransport()->readEnd(); 59 70 71 72 60 73 if(mtype == facebook::thrift::protocol::T_EXCEPTION || 61 74 mtype == facebook::thrift::protocol::T_REPLY ){ … … 101 114 102 115 116 shared_ptr<ServiceNode> ServiceNode::clone() 117 { 118 return shared_ptr<ServiceNode>( new ServiceNode(name,host,port,master) ); 119 } 120 103 121 //////////////////////////////////////////////////////////////////////////// 104 122 105 123 LoggerPtr ServicePartition::logger (Logger::getLogger ("ServicePartition")); 106 124 107 ServicePartition::ServicePartition(const std::string name )108 : name(name) 125 ServicePartition::ServicePartition(const std::string name, unsigned int pool_size) 126 : name(name), pool_size(pool_size) 109 127 { 110 128 … … 120 138 vector<string> names; 121 139 122 map<string, shared_ptr<ServiceNode> >::iterator it;140 map<string, vector<shared_ptr<ServiceNode> > >::iterator it; 123 141 124 142 for(it = service_nodes.begin(); it != service_nodes.end(); ++it){ … … 131 149 void ServicePartition::addServiceNode( shared_ptr<ServiceNode> node ) 132 150 { 133 service_nodes[node->name] = node; 151 152 vector< shared_ptr<ServiceNode> > nodes; 153 154 nodes.push_back( node ); 155 156 for(unsigned int i=0; i<(pool_size-1); i++){ 157 nodes.push_back( node->clone() ); 158 } 159 160 service_nodes[node->name] = nodes; 134 161 } 135 162 … … 141 168 shared_ptr<ServiceNode> ServicePartition::getServiceNode( std::string _name ) 142 169 { 143 map<string, shared_ptr<ServiceNode> >::iterator it = service_nodes.find( _name );170 map<string, vector<shared_ptr<ServiceNode> > >::iterator it = service_nodes.find( _name ); 144 171 145 172 if(it == service_nodes.end()) 146 173 throw TException("Unknown node "+_name); 147 174 148 return it->second; 149 } 175 //return a random connection to this node 176 return it->second[ (rand() * 10000 ) % pool_size]; 177 } 178 179 shared_ptr<TProtocol> ServicePartition::getConnection( bool master ) 180 { 181 182 map<string, vector< shared_ptr<ServiceNode> > >::iterator it; 183 184 for(it=service_nodes.begin(); it!=service_nodes.end(); ++it){ 185 186 if( (master && it->second[0]->master) || !master ) 187 return it->second[(rand() * 10000 ) % pool_size]->getConnection(); 188 189 } 190 191 throw TException("No connections available "); 192 } 193 194 150 195 151 196 //////////////////////////////////////////////////////////////////////////// … … 185 230 void ServiceMonitor::delServicePartition( string name ) 186 231 { 187 188 232 service_partitions.erase(name); 189 190 233 } 191 234 … … 201 244 return it->second; 202 245 } 246 247 248 249 vector< shared_ptr<TProtocol> > ServiceMonitor::getConnections( bool masters ) 250 { 251 252 vector< shared_ptr<TProtocol> > nodes; 253 254 map<string, shared_ptr<ServicePartition> >::iterator it; 255 256 for(it=service_partitions.begin(); it!=service_partitions.end(); ++it){ 257 258 nodes.push_back( it->second->getConnection( masters ) ); 259 } 260 261 return nodes; 262 } -
trunk/throxy/src/ServiceMonitor.h
r285 r332 17 17 bool ping(); 18 18 boost::shared_ptr<facebook::thrift::protocol::TProtocol> getConnection(); 19 boost::shared_ptr<ServiceNode> clone(); // makin copies 19 20 20 21 const std::string name; 22 const bool master; 23 21 24 protected: 22 25 ServiceNode(); … … 25 28 26 29 int port; 27 bool master;28 29 30 bool active; 30 31 uint64_t last_ping; … … 42 43 { 43 44 public: 44 ServicePartition(const std::string name );45 ServicePartition(const std::string name, unsigned int pool_size = 10 ); 45 46 ~ServicePartition(); 46 47 … … 51 52 boost::shared_ptr<ServiceNode> getServiceNode( std::string name ); 52 53 54 boost::shared_ptr<facebook::thrift::protocol::TProtocol> getConnection(bool master); 55 53 56 const std::string name; 54 57 … … 56 59 ServicePartition(); 57 60 58 std::map< std::string, boost::shared_ptr<ServiceNode> > service_nodes;61 const unsigned int pool_size; 59 62 60 63 std::map< std::string, std::vector< boost::shared_ptr<ServiceNode> > > service_nodes; 61 64 62 65 static log4cxx::LoggerPtr logger; … … 76 79 boost::shared_ptr<ServicePartition> getServicePartition( std::string name ); 77 80 81 std::vector< boost::shared_ptr<facebook::thrift::protocol::TProtocol> > getConnections( bool masters = false ); 82 78 83 protected: 79 84 std::map< std::string, boost::shared_ptr<ServicePartition> > service_partitions; -
trunk/throxy/src/StaticServiceMonitor.cpp
r285 r332 48 48 break; 49 49 50 51 50 LOG4CXX_INFO(logger, string("Partition node added: ")+n_name); 52 51 … … 63 62 this->addServicePartition(part); 64 63 } 64 65 cerr<<"Done"<<endl; 65 66 } 66 67 67 68 69 StaticServiceMonitor::~StaticServiceMonitor() 70 { 68 71 72 } 73 74 -
trunk/throxy/src/ThrudocHandler.cpp
r285 r332 16 16 using namespace log4cxx; 17 17 using namespace std; 18 using namespace facebook::thrift::protocol; 18 19 19 20 LoggerPtr ThrudocHandler::logger (Logger::getLogger ("ThrudocHandler")); … … 21 22 ThrudocHandler::ThrudocHandler () 22 23 { 24 //never invoked (private) 25 } 23 26 27 ThrudocHandler::ThrudocHandler(shared_ptr<ServiceMonitor> svc_mon) 28 : monitor(svc_mon) 29 { 24 30 25 31 } … … 27 33 void ThrudocHandler::getBuckets (vector<string> &_return) 28 34 { 35 vector< shared_ptr<TProtocol> > protocols = monitor->getConnections( true ); 29 36 37 shared_ptr<MyThrudocClient> client( new MyThrudocClient() ); 38 39 //Send data to each partition 40 for(size_t i=0; i<protocols.size(); i++){ 41 client->setProtocol(protocols[i]); 42 43 client->send_getBuckets(); 44 } 45 46 map<string, int> all_buckets; 47 48 //Recv data from each partition 49 for(size_t i=0; i<protocols.size(); i++){ 50 client->setProtocol(protocols[i]); 51 52 vector<string> buckets; 53 client->recv_getBuckets(buckets); 54 55 //Count buckets from each partition 56 for(size_t j=0; j<buckets.size(); j++){ 57 map<string, int>::iterator it = all_buckets.find( buckets[j] ); 58 59 if( it == all_buckets.end() ){ 60 all_buckets[ buckets[j] ] = 1; 61 }else{ 62 all_buckets[ buckets[j] ]++; 63 } 64 } 65 } 66 67 // 68 map<string, int>::iterator it; 69 70 for(it = all_buckets.begin(); it != all_buckets.end(); ++it){ 71 72 if(all_buckets.size() != protocols.size()){ 73 LOG4CXX_ERROR(logger,"Parition missing bucket"+it->first); 74 } 75 76 _return.push_back(it->first); 77 } 30 78 31 79 } -
trunk/throxy/src/ThrudocHandler.h
r285 r332 11 11 #include <log4cxx/logger.h> 12 12 13 #include "ServiceMonitor.h" 14 13 15 class ThrudocHandler : virtual public thrudoc::ThrudocIf { 14 public:15 ThrudocHandler ();16 public: 17 ThrudocHandler (boost::shared_ptr<ServiceMonitor> svc_mon); 16 18 17 void getBuckets (std::vector<std::string> &_return);19 void getBuckets (std::vector<std::string> &_return); 18 20 19 void put (const std::string &bucket, const std::string &key,20 const std::string &value);21 void put (const std::string &bucket, const std::string &key, 22 const std::string &value); 21 23 22 void putValue (std::string &_return, const std::string &bucket,23 const std::string &value);24 void putValue (std::string &_return, const std::string &bucket, 25 const std::string &value); 24 26 25 void get (std::string &_return, const std::string &bucket,26 const std::string &key);27 void get (std::string &_return, const std::string &bucket, 28 const std::string &key); 27 29 28 void remove (const std::string &bucket, const std::string &key);30 void remove (const std::string &bucket, const std::string &key); 29 31 30 void scan (thrudoc::ScanResponse &_return,31 const std::string &bucket,32 const std::string &seed, int32_t count);32 void scan (thrudoc::ScanResponse &_return, 33 const std::string &bucket, 34 const std::string &seed, int32_t count); 33 35 34 void putList(std::vector<thrudoc::ThrudocException> &_return,35 const std::vector<thrudoc::Element> &elements);36 void putList(std::vector<thrudoc::ThrudocException> &_return, 37 const std::vector<thrudoc::Element> &elements); 36 38 37 void getList(std::vector<thrudoc::ListResponse> &_return,38 const std::vector<thrudoc::Element> &elements);39 void getList(std::vector<thrudoc::ListResponse> &_return, 40 const std::vector<thrudoc::Element> &elements); 39 41 40 void removeList(std::vector<thrudoc::ThrudocException> &_return,41 const std::vector<thrudoc::Element> &elements);42 void removeList(std::vector<thrudoc::ThrudocException> &_return, 43 const std::vector<thrudoc::Element> &elements); 42 44 43 void putValueList(std::vector<thrudoc::ListResponse> &_return,44 const std::vector<thrudoc::Element> &elements);45 void putValueList(std::vector<thrudoc::ListResponse> &_return, 46 const std::vector<thrudoc::Element> &elements); 45 47 46 void admin (std::string &_return, const std::string &op,47 const std::string &data);48 void admin (std::string &_return, const std::string &op, 49 const std::string &data); 48 50 49 private: 50 static log4cxx::LoggerPtr logger; 51 private: 52 ThrudocHandler(); 53 54 static log4cxx::LoggerPtr logger; 55 56 boost::shared_ptr<ServiceMonitor> monitor; 57 58 59 }; 60 61 62 class MyThrudocClient : public thrudoc::ThrudocClient 63 { 64 public: 65 MyThrudocClient(): 66 ThrudocClient(boost::shared_ptr<facebook::thrift::protocol::TProtocol>()){}; 67 68 //Let me reuse this client for each connection 69 void setProtocol(boost::shared_ptr<facebook::thrift::protocol::TProtocol> p){ 70 this->piprot_ = p; 71 poprot_ = p; 72 iprot_ = p.get(); 73 oprot_ = p.get(); 74 } 75 51 76 }; 52 77 -
trunk/throxy/src/main.cpp
r285 r332 1 #include <concurrency/ThreadManager.h> 2 #include <concurrency/PosixThreadFactory.h> 3 #include <protocol/TBinaryProtocol.h> 4 #include <server/TNonblockingServer.h> 5 #include <transport/TServerSocket.h> 6 #include <transport/TTransportUtils.h> 1 #include <thrift/TProcessor.h> 2 #include <thrift/concurrency/ThreadManager.h> 3 #include <thrift/concurrency/PosixThreadFactory.h> 4 #include <thrift/protocol/TBinaryProtocol.h> 5 #include <thrift/server/TNonblockingServer.h> 6 #include <thrift/transport/TServerSocket.h> 7 #include <thrift/transport/TTransportUtils.h> 7 8 8 9 … … 16 17 #include "utils.h" 17 18 18 #include "Thrudoc.h" 19 #include "ThrudocHandler.h" 20 #include "StaticServiceMonitor.h" 19 21 #include "Thrudex.h" 20 22 … … 90 92 shared_ptr<PosixThreadFactory> thread_factory(new PosixThreadFactory()); 91 93 shared_ptr<TProtocolFactory> protocol_factory(new TBinaryProtocolFactory()); 94 thread_factory->setDetached(false); 92 95 93 96 //collection of server to throxy … … 96 99 97 100 //Thrudoc enabled 98 if( ConfigManager->read<int>("THRUDOC_PORT",-1) == -1 ) {101 if( ConfigManager->read<int>("THRUDOC_PORT",-1) != -1 ) { 99 102 100 103 int thrudoc_port = ConfigManager->read<int>("THRUDOC_PORT"); 101 int thread_count = 3 0;104 int thread_count = 3; 102 105 103 shared_ptr<ThrudocHandler> handler getThrudocHandler();104 shared_ptr<TProcessor> processor (newThrudocProcessor(handler));106 shared_ptr<ThrudocHandler> handler = getThrudocHandler(); 107 shared_ptr<TProcessor> processor(new thrudoc::ThrudocProcessor(handler)); 105 108 106 109 shared_ptr<ThreadManager> thread_manager = … … 114 117 115 118 119 116 120 shared_ptr<Thread> t = 117 121 thread_factory->newThread(shared_ptr<TServer>(new TNonblockingServer(processor, protocol_factory, 118 122 thrudoc_port,thread_manager))); 119 123 120 server_thread .push_back(t);124 server_threads.push_back(t); 121 125 } 122 126 … … 139 143 140 144 //Start servers 141 for( int i=0; i<server_threads.size(); i++){145 for(size_t i=0; i<server_threads.size(); i++){ 142 146 server_threads[i]->start(); 143 147 } 144 148 145 149 //wait for server threads 146 for( int i=0; i<server_threads.size(); i++){150 for(size_t i=0; i<server_threads.size(); i++){ 147 151 server_threads[i]->join(); 148 152 } 149 150 153 151 154 -
trunk/throxy/src/static.conf
r285 r332 13 13 PARTITION_2 = p2 14 14 15 PARTITION_2_NODE_1_NAME = node 115 PARTITION_2_NODE_1_NAME = node2 16 16 PARTITION_2_NODE_1_HOST = 127.0.0.1 17 17 PARTITION_2_NODE_1_PORT = 1978 -
trunk/throxy/src/throxy.conf
r285 r332 4 4 THRUDOC_PORT = 1998 5 5 THRUDOC_MONITOR = STATIC 6 THRUDOC_MONITOR_ARG1 = thrudoc.conf 6 THRUDOC_MONITOR_ARG1 = static.conf 7 8 9 # Set root logger level to DEBUG and its only appender to A1. 10 log4j.rootLogger=DEBUG, A1 11 12 # A1 is set to be a ConsoleAppender. 13 log4j.appender.A1=org.apache.log4j.ConsoleAppender 14 15 # A1 uses PatternLayout. 16 log4j.appender.A1.layout=org.apache.log4j.PatternLayout 17 log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n 7 18 8 19 9 20 10 11
