Changeset 283
- Timestamp:
- 02/18/08 09:06:30 (9 months ago)
- Location:
- trunk/thrudoc
- Files:
-
- 3 modified
- 2 moved
-
examples/thrudoc.conf (modified) (1 diff)
-
src/Makefile.am (modified) (2 diffs)
-
src/ReplicationBackend.cpp (moved) (moved from trunk/thrudoc/src/SpreadReplicationBackend.cpp) (30 diffs)
-
src/ReplicationBackend.h (moved) (moved from trunk/thrudoc/src/SpreadReplicationBackend.h) (6 diffs)
-
src/app_helpers.cpp (modified) (4 diffs)
Legend:
- Unmodified
- Added
- Removed
-
trunk/thrudoc/examples/thrudoc.conf
r234 r283 38 38 #SPREAD_GROUP = thrudoc 39 39 40 # spread replication backend 41 #REPLICATION_NAME = 4803@localhost 42 #REPLICATION_PRIVATE_NAME = ds_s1 43 #REPLICATION_GROUP = thrudoc 44 #REPLICATION_STATUS_FILE=replication_status 45 #REPLICATION_STATUS_FLUSH_FREQUENCY=30 46 40 47 #ENABLE_BLOOM_FILTER=1 41 48 -
trunk/thrudoc/src/Makefile.am
r251 r283 11 11 include_thrudocdir = $(includedir)/thrudoc 12 12 include_thrudoc_HEADERS = \ 13 app_helpers.h \ 14 BDBBackend.h \ 15 BloomBackend.h \ 16 DiskBackend.h \ 17 LogBackend.h \ 18 MemcachedBackend.h \ 19 MySQLBackend.h \ 20 NBackend.h \ 21 NullBackend.h \ 22 S3Backend.h \ 23 SpreadBackend.h \ 24 StatsBackend.h \ 13 app_helpers.h \ 14 BDBBackend.h \ 15 BloomBackend.h \ 16 DiskBackend.h \ 17 LogBackend.h \ 18 MemcachedBackend.h \ 19 MySQLBackend.h \ 20 NBackend.h \ 21 NullBackend.h \ 22 ReplicationBackend.h \ 23 S3Backend.h \ 24 SpreadBackend.h \ 25 StatsBackend.h \ 25 26 ThrudocBackend.h 26 27 … … 40 41 NBackend.cpp \ 41 42 NullBackend.cpp \ 43 ReplicationBackend.cpp \ 42 44 S3Backend.cpp \ 43 45 StatsBackend.cpp \ -
trunk/thrudoc/src/ReplicationBackend.cpp
r280 r283 7 7 #if HAVE_LIBSPREAD && HAVE_LIBUUID 8 8 9 #include " SpreadReplicationBackend.h"9 #include "ReplicationBackend.h" 10 10 11 11 #include <sys/types.h> … … 21 21 #define UUID_LEN 37 22 22 #define MESSAGE_OVERHEAD 4 23 #define SPREAD_BACKEND_MAX_MESSAGE_SIZE MAX_BUCKET_SIZE + MAX_KEY_SIZE + MAX_VALUE_SIZE + UUID_LEN + MESSAGE_OVERHEAD23 #define REPLICATION_BACKEND_MAX_MESSAGE_SIZE MAX_BUCKET_SIZE + MAX_KEY_SIZE + MAX_VALUE_SIZE + UUID_LEN + MESSAGE_OVERHEAD 24 24 25 25 #define ORIG_MESSAGE_TYPE 1 … … 35 35 string SP_error_to_string (int error); 36 36 37 class SpreadReplicationWait37 class ReplicationWait 38 38 { 39 39 public: 40 40 // uuid is just for logging/debugging purposes 41 SpreadReplicationWait (string uuid, uint32_t max_wait)41 ReplicationWait (string uuid, uint32_t max_wait) 42 42 { 43 43 char buf[128]; 44 sprintf (buf, " SpreadReplicationWait: uuid=%s, max_wait=%d",44 sprintf (buf, "ReplicationWait: uuid=%s, max_wait=%d", 45 45 uuid.c_str (), max_wait); 46 46 LOG4CXX_DEBUG (logger, buf); … … 56 56 } 57 57 58 ~ SpreadReplicationWait ()59 { 60 LOG4CXX_DEBUG (logger, "~ SpreadReplicationWait: uuid=" +58 ~ReplicationWait () 59 { 60 LOG4CXX_DEBUG (logger, "~ReplicationWait: uuid=" + 61 61 this->uuid); 62 62 pthread_cond_destroy (&this->condition); … … 132 132 }; 133 133 134 class SpreadReplicationMessage134 class ReplicationMessage 135 135 { 136 136 public: … … 138 138 { 139 139 max_groups = 5; 140 buf_size = SPREAD_BACKEND_MAX_MESSAGE_SIZE;140 buf_size = REPLICATION_BACKEND_MAX_MESSAGE_SIZE; 141 141 142 142 buf_len = SP_receive (spread_mailbox, &service_type, sender, … … 180 180 } 181 181 182 int parse (const string & spread_private_group)182 int parse (const string & replication_private_group) 183 183 { 184 184 // TODO: need better/safer parsing than scanf... maybe thrift ser. … … 196 196 } 197 197 else if (type == REPLAY_MESSAGE_TYPE && 198 spread_private_group == groups[0])198 replication_private_group == groups[0]) 199 199 { 200 200 LOG4CXX_DEBUG (logger, "parse: replay"); … … 276 276 int buf_size; 277 277 int buf_len; 278 char buf[ SPREAD_BACKEND_MAX_MESSAGE_SIZE];278 char buf[REPLICATION_BACKEND_MAX_MESSAGE_SIZE]; 279 279 280 280 // repli command stuff … … 287 287 288 288 // private 289 LoggerPtr SpreadReplicationWait::logger (Logger::getLogger ("SpreadReplicationWait")); 290 LoggerPtr SpreadReplicationMessage::logger (Logger::getLogger ("SpreadReplicationMessage")); 291 LoggerPtr SpreadReplicationBackend::logger (Logger::getLogger ("SpreadReplicationBackend")); 292 293 SpreadReplicationBackend::SpreadReplicationBackend (shared_ptr<ThrudocBackend> backend, 294 const string & spread_name, 295 const string & spread_private_name, 296 const string & spread_group) 297 { 298 LOG4CXX_INFO (logger, string ("SpreadReplicationBackend: spread_name=") + 299 spread_name + ", spread_private_name=" + 300 spread_private_name + ", spread_group=" + spread_group); 289 LoggerPtr ReplicationWait::logger (Logger::getLogger ("ReplicationWait")); 290 LoggerPtr ReplicationMessage::logger (Logger::getLogger ("ReplicationMessage")); 291 LoggerPtr ReplicationBackend::logger (Logger::getLogger ("ReplicationBackend")); 292 293 ReplicationBackend::ReplicationBackend (shared_ptr<ThrudocBackend> backend, 294 const string & replication_name, 295 const string & replication_private_name, 296 const string & replication_group, 297 const string & replication_status_file, 298 const int replication_status_flush_frequency) 299 { 300 char buf[1024]; 301 sprintf (buf, "ReplicationBackend: replication_name=%s, replication_private_name=%s, replication_group=%s, replication_status_file=%s, replication_status_flush_frequency=%d", 302 replication_name.c_str (), replication_private_name.c_str (), 303 replication_group.c_str (), replication_status_file.c_str (), 304 replication_status_flush_frequency); 305 LOG4CXX_INFO (logger, buf); 301 306 302 307 this->set_backend (backend); 303 this->spread_name = spread_name; 304 this->spread_private_name = spread_private_name; 305 this->spread_group = spread_group; 308 this->replication_name = replication_name; 309 this->replication_private_name = replication_private_name; 310 this->replication_group = replication_group; 311 this->replication_status_file = replication_status_file; 312 this->replication_status_flush_frequency = replication_status_flush_frequency; 306 313 307 314 char private_group[MAX_GROUP_NAME]; 308 int ret = SP_connect (this-> spread_name.c_str (),309 this-> spread_private_name.c_str (), 0, 1,315 int ret = SP_connect (this->replication_name.c_str (), 316 this->replication_private_name.c_str (), 0, 1, 310 317 &this->spread_mailbox, private_group); 311 318 if (ret < 0) … … 318 325 } 319 326 320 this-> spread_private_group = string (private_group);321 LOG4CXX_INFO (logger, " SpreadBackend: private_group=" +322 this-> spread_private_group);323 324 ret = SP_join (this->spread_mailbox, this-> spread_group.c_str ());327 this->replication_private_group = string (private_group); 328 LOG4CXX_INFO (logger, "ReplicationBackend: private_group=" + 329 this->replication_private_group); 330 331 ret = SP_join (this->spread_mailbox, this->replication_group.c_str ()); 325 332 if (ret < 0) 326 333 { … … 333 340 334 341 int fd; 335 fd = ::open ("last_uuid", 0x0, S_IRUSR | S_IWUSR| S_IRGRP | S_IROTH); 342 fd = ::open (this->replication_status_file.c_str (), 0x0, 343 S_IRUSR | S_IWUSR| S_IRGRP | S_IROTH); 336 344 listener_live = true; // we're live unless we load a last_uuid in a sec 337 345 if (fd) 338 346 { 347 LOG4CXX_DEBUG (logger, "ReplicationBackend: opened status file=" + 348 this->replication_status_file); 339 349 char buf[64] = ""; 340 350 ::read (fd, buf, 64); 341 last_uuid = buf;351 this->last_uuid = buf; 342 352 ::close (fd); 343 if (! last_uuid.empty ())353 if (!this->last_uuid.empty ()) 344 354 { 345 355 listener_live = false; 346 request_next ( last_uuid);347 LOG4CXX_INFO (logger, " SpreadReplicationBackend: loaded last_uuid=" +348 last_uuid);356 request_next (this->last_uuid); 357 LOG4CXX_INFO (logger, "ReplicationBackend: found last_uuid=" + 358 this->last_uuid); 349 359 } 350 360 } … … 355 365 (void *)this) != 0) 356 366 { 357 char error[] = " SpreadReplicationBackend: start_listener_thread failed\n";367 char error[] = "ReplicationBackend: start_listener_thread failed\n"; 358 368 LOG4CXX_ERROR (logger, error); 359 369 ThrudocException e; … … 363 373 } 364 374 365 SpreadReplicationBackend::~SpreadReplicationBackend ()366 { 367 LOG4CXX_INFO (logger, "~ SpreadReplicationBackend");375 ReplicationBackend::~ReplicationBackend () 376 { 377 LOG4CXX_INFO (logger, "~ReplicationBackend"); 368 378 // we're no longer live, don't accept connections 369 379 this->listener_live = false; … … 400 410 // TODO: implement circuit breaker pattern around spread... 401 411 402 void SpreadReplicationBackend::put (const string & bucket, const string & key,412 void ReplicationBackend::put (const string & bucket, const string & key, 403 413 const string & value) 404 414 { 405 char msg[ SPREAD_BACKEND_MAX_MESSAGE_SIZE];415 char msg[REPLICATION_BACKEND_MAX_MESSAGE_SIZE]; 406 416 string uuid = generate_uuid (); 407 snprintf (msg, SPREAD_BACKEND_MAX_MESSAGE_SIZE, "%s p %s %s %s",417 snprintf (msg, REPLICATION_BACKEND_MAX_MESSAGE_SIZE, "%s p %s %s %s", 408 418 uuid.c_str (), bucket.c_str (), key.c_str (), value.c_str ()); 409 419 … … 411 421 } 412 422 413 void SpreadReplicationBackend::remove (const string & bucket,423 void ReplicationBackend::remove (const string & bucket, 414 424 const string & key ) 415 425 { 416 char msg[ SPREAD_BACKEND_MAX_MESSAGE_SIZE];426 char msg[REPLICATION_BACKEND_MAX_MESSAGE_SIZE]; 417 427 string uuid = generate_uuid (); 418 snprintf (msg, SPREAD_BACKEND_MAX_MESSAGE_SIZE, "%s r %s %s",428 snprintf (msg, REPLICATION_BACKEND_MAX_MESSAGE_SIZE, "%s r %s %s", 419 429 uuid.c_str (), bucket.c_str (), key.c_str ()); 420 430 … … 422 432 } 423 433 424 string SpreadReplicationBackend::admin (const string & op, const string & data)434 string ReplicationBackend::admin (const string & op, const string & data) 425 435 { 426 436 if (op == "replay_from") … … 430 440 return "done"; 431 441 } 432 char msg[ SPREAD_BACKEND_MAX_MESSAGE_SIZE];442 char msg[REPLICATION_BACKEND_MAX_MESSAGE_SIZE]; 433 443 string uuid = generate_uuid (); 434 snprintf (msg, SPREAD_BACKEND_MAX_MESSAGE_SIZE, "%s a %s %s",444 snprintf (msg, REPLICATION_BACKEND_MAX_MESSAGE_SIZE, "%s a %s %s", 435 445 uuid.c_str (), op.c_str (), data.c_str ()); 436 446 … … 438 448 } 439 449 440 string SpreadReplicationBackend::send_and_wait_for_resp (const char * msg,450 string ReplicationBackend::send_and_wait_for_resp (const char * msg, 441 451 string uuid) 442 452 { 443 453 LOG4CXX_DEBUG (logger, "wait_for_resp: begin uuid=" + uuid); 444 454 string ret; 445 shared_ptr< SpreadReplicationWait> wait (new SpreadReplicationWait (uuid,455 shared_ptr<ReplicationWait> wait (new ReplicationWait (uuid, 446 456 2)); 447 457 // install wait … … 451 461 } 452 462 // send out multi-cast message 453 SP_multicast (this->spread_mailbox, SAFE_MESS, this->spread_group.c_str (), 463 SP_multicast (this->spread_mailbox, SAFE_MESS, 464 this->replication_group.c_str (), 454 465 ORIG_MESSAGE_TYPE, strlen (msg), msg); 455 466 // wait here until we have the result … … 471 482 } 472 483 473 void SpreadReplicationBackend::validate (const std::string & bucket,484 void ReplicationBackend::validate (const std::string & bucket, 474 485 const std::string * key, 475 486 const std::string * value) … … 504 515 } 505 516 506 string SpreadReplicationBackend::generate_uuid ()517 string ReplicationBackend::generate_uuid () 507 518 { 508 519 uuid_t uuid; … … 513 524 } 514 525 515 void * SpreadReplicationBackend::start_listener_thread (void * ptr)526 void * ReplicationBackend::start_listener_thread (void * ptr) 516 527 { 517 528 LOG4CXX_INFO (logger, "start_listener_thread: "); 518 ((( SpreadReplicationBackend*)ptr)->listener_thread_run ());529 (((ReplicationBackend*)ptr)->listener_thread_run ()); 519 530 return NULL; 520 531 } 521 532 522 void SpreadReplicationBackend::listener_thread_run ()533 void ReplicationBackend::listener_thread_run () 523 534 { 524 535 struct pollfd fds[] = { {this->spread_mailbox, POLLIN, 0} }; … … 541 552 this->listener_thread_go = false; 542 553 } 543 if (last_flush + 30 < time (0)) 544 { 545 LOG4CXX_DEBUG (logger, "flushing last_uuid=" + last_uuid); 554 if ((last_flush + this->replication_status_flush_frequency) < 555 time (0)) 556 { 557 LOG4CXX_DEBUG (logger, "listener_thread_run: flushing last_uuid=" + 558 this->last_uuid); 546 559 int fd; 547 fd = ::open ("last_uuid", O_RDWR | O_TRUNC | O_CREAT, 560 fd = ::open (this->replication_status_file.c_str (), 561 O_RDWR | O_TRUNC | O_CREAT, 548 562 S_IRUSR | S_IWUSR| S_IRGRP | S_IROTH); 549 563 ::write (fd, this->last_uuid.c_str (), … … 564 578 } 565 579 566 void SpreadReplicationBackend::handle_message ()567 { 568 SpreadReplicationMessage * message = new SpreadReplicationMessage ();580 void ReplicationBackend::handle_message () 581 { 582 ReplicationMessage * message = new ReplicationMessage (); 569 583 message->receive (this->spread_mailbox); 570 int type = message->parse (this-> spread_private_group);584 int type = message->parse (this->replication_private_group); 571 585 if (type == ORIG_MESSAGE_TYPE) 572 586 { … … 583 597 while (!pending_messages.empty ()) 584 598 { 585 SpreadReplicationMessage * drain = pending_messages.front ();599 ReplicationMessage * drain = pending_messages.front (); 586 600 pending_messages.pop (); 587 601 LOG4CXX_DEBUG (logger, string ("handle_message: drain.uuid=") + … … 608 622 { 609 623 // it's a message 610 SpreadReplicationMessage * first_queued = pending_messages.front ();624 ReplicationMessage * first_queued = pending_messages.front (); 611 625 if (first_queued == NULL || 612 626 first_queued->get_uuid () != message->get_uuid ()) … … 645 659 } 646 660 647 void SpreadReplicationBackend::do_message (SpreadReplicationMessage * message)661 void ReplicationBackend::do_message (ReplicationMessage * message) 648 662 { 649 663 string ret; … … 696 710 697 711 // if we sent this message signal to the waiting thread that it's complete 698 if (this-> spread_private_group == message->get_sender ())712 if (this->replication_private_group == message->get_sender ()) 699 713 { 700 714 RWGuard g (this->pending_waits_mutex, false); 701 std::map<std::string, boost::shared_ptr< SpreadReplicationWait> >::iterator715 std::map<std::string, boost::shared_ptr<ReplicationWait> >::iterator 702 716 i = pending_waits.find (message->get_uuid ()); 703 717 if (i != pending_waits.end ()) … … 708 722 } 709 723 710 void SpreadReplicationBackend::request_next (string uuid)724 void ReplicationBackend::request_next (string uuid) 711 725 { 712 726 // we don't want our own message back here... 713 727 SP_multicast (this->spread_mailbox, RELIABLE_MESS | SELF_DISCARD, 714 this-> spread_group.c_str (),728 this->replication_group.c_str (), 715 729 REPLAY_MESSAGE_TYPE, uuid.length (), uuid.c_str ()); 716 730 } -
trunk/thrudoc/src/ReplicationBackend.h
r280 r283 3 3 **/ 4 4 5 #ifndef _ _SPREAD_REPLICIATION_BACKEND_H_6 #define _ _SPREAD_REPLICIATION_BACKEND_H_5 #ifndef _REPLICIATION_BACKEND_H_ 6 #define _REPLICIATION_BACKEND_H_ 7 7 8 8 #if HAVE_LIBMEMCACHED && HAVE_LIBUUID … … 18 18 #include "ThrudocPassthruBackend.h" 19 19 20 class SpreadReplicationWait;21 class SpreadReplicationMessage;20 class ReplicationWait; 21 class ReplicationMessage; 22 22 23 23 /* NOTE: this is not exactly a normal passthrough backend even tho it kinda 24 24 * looks like one. we use the passthrough, but there's spread in between 25 25 * passing things down */ 26 class SpreadReplicationBackend : public ThrudocPassthruBackend26 class ReplicationBackend : public ThrudocPassthruBackend 27 27 { 28 28 public: 29 SpreadReplicationBackend (boost::shared_ptr<ThrudocBackend> backend, 30 const std::string & spread_name, 31 const std::string & spread_private_name, 32 const std::string & spread_group); 33 ~SpreadReplicationBackend (); 29 ReplicationBackend (boost::shared_ptr<ThrudocBackend> backend, 30 const std::string & replication_name, 31 const std::string & replication_private_name, 32 const std::string & replication_group, 33 const std::string & replication_status_file, 34 const int replication_status_flush_frequency); 35 ~ReplicationBackend (); 34 36 35 37 void put (const std::string & bucket, const std::string & key, … … 48 50 static void * start_listener_thread (void * ptr); 49 51 50 std::string spread_name; 51 std::string spread_private_name; 52 std::string spread_group; 53 std::string spread_private_group; 52 std::string replication_name; 53 std::string replication_private_name; 54 std::string replication_group; 55 std::string replication_private_group; 56 std::string replication_status_file; 57 int replication_status_flush_frequency; 54 58 mailbox spread_mailbox; 55 59 pthread_t listener_thread; … … 57 61 bool listener_live; 58 62 std::string last_uuid; 59 std::queue< SpreadReplicationMessage *> pending_messages;60 std::map<std::string, boost::shared_ptr< SpreadReplicationWait> >63 std::queue<ReplicationMessage *> pending_messages; 64 std::map<std::string, boost::shared_ptr<ReplicationWait> > 61 65 pending_waits; 62 66 facebook::thrift::concurrency::ReadWriteMutex pending_waits_mutex; … … 65 69 void listener_thread_run (); 66 70 void handle_message (); 67 void do_message ( SpreadReplicationMessage * message);71 void do_message (ReplicationMessage * message); 68 72 void request_next (std::string message); 69 73 }; … … 71 75 #endif /* HAVE_LIBMEMCACHED */ 72 76 73 #endif 77 #endif /* _REPLICIATION_BACKEND_H_ */ -
trunk/thrudoc/src/app_helpers.cpp
r230 r283 18 18 #include "S3Backend.h" 19 19 #include "SpreadBackend.h" 20 #include "ReplicationBackend.h" 20 21 #include "StatsBackend.h" 21 22 #include "ThrudocBackend.h" … … 174 175 string spread_private_name = 175 176 ConfigManager->read<string>("SPREAD_PRIVATE_NAME", ""); 177 // Spread replication (sorta) passthrough 178 string replication_private_name = 179 ConfigManager->read<string>("REPLICATION_PRIVATE_NAME", ""); 176 180 #if HAVE_LIBSPREAD 177 181 string spread_name = … … 184 188 (new SpreadBackend (backend, spread_name, spread_private_name, 185 189 spread_group)); 190 191 string replication_name = 192 ConfigManager->read<string>("REPLICATION_NAME", "4803"); 193 string replication_group = 194 ConfigManager->read<string>("REPLICATION_GROUP", "thrudoc"); 195 string replication_status_file = 196 ConfigManager->read<string>("REPLICATION_STATUS_FILE", 197 "replication_status"); 198 int replication_status_flush_frequency = 199 ConfigManager->read<int>("REPLICATION_STATUS_FLUSH_FREQUENCY", 30); 200 201 if (!replication_private_name.empty ()) 202 backend = shared_ptr<ThrudocBackend> 203 (new ReplicationBackend (backend, replication_name, 204 replication_private_name, 205 replication_group, 206 replication_status_file, 207 replication_status_flush_frequency)); 186 208 #else 187 209 if (!spread_private_name.empty ()) … … 189 211 LOG4CXX_ERROR (logger, "SPREAD_PRIVATE_NAME supplied, but spread support not complied in"); 190 212 fprintf (stderr, "SPREAD_PRIVATE_NAME supplied, but spread support not complied in\n"); 213 exit (1); 214 } 215 if (!replication_private_name.empty ()) 216 { 217 LOG4CXX_ERROR (logger, "REPLICATION_PRIVATE_NAME supplied, but spread support not complied in"); 218 fprintf (stderr, "REPLCATION_PRIVATE_NAME supplied, but spread support not complied in\n"); 191 219 exit (1); 192 220 }
