Changeset 339
- Timestamp:
- 04/04/08 01:31:30 (8 months ago)
- Location:
- trunk
- Files:
-
- 6 added
- 11 modified
- 1 copied
- 1 moved
-
thrucommon/configure.in (modified) (2 diffs)
-
thrucommon/src (modified) (1 prop)
-
thrucommon/src/FileLogger.cpp (added)
-
thrucommon/src/FileLogger.h (added)
-
thrucommon/src/Makefile.am (modified) (2 diffs)
-
thrudex (modified) (1 prop)
-
thrudex/configure.in (modified) (1 diff)
-
thrudex/src (modified) (1 prop)
-
thrudex/src/LogBackend.cpp (added)
-
thrudex/src/LogBackend.h (added)
-
thrudex/src/Makefile.am (modified) (4 diffs)
-
thrudex/src/app_helpers.cpp (added)
-
thrudex/src/app_helpers.h (added)
-
thrudex/src/thrudex.cpp (moved) (moved from trunk/thrudex/src/main.cpp) (2 diffs)
-
thrudex/src/thrudex_replay.cpp (copied) (copied from trunk/thrudoc/src/thrudoc_replay.cpp) (11 diffs)
-
thrudoc/src/LogBackend.cpp (modified) (10 diffs)
-
thrudoc/src/LogBackend.h (modified) (3 diffs)
-
thrudoc/src/thrudoc.cpp (modified) (2 diffs)
-
thrudoc/src/thrudoc_replay.cpp (modified) (2 diffs)
Legend:
- Unmodified
- Added
- Removed
-
trunk/thrucommon/configure.in
r304 r339 83 83 fi 84 84 85 # boost filesystem 86 if test "$with_disk" != "no" -o "$with_bdb" != "no"; then 87 AC_CHECK_LIB(boost_filesystem, [open], [], 88 AC_MSG_ERROR([You must install boost_filesystem; please install libraries and development files])) 89 AC_CHECK_HEADERS([boost/filesystem/path.hpp],, 90 AC_MSG_ERROR([boost/filesystem/path.hpp missing; please add db_cxx development files])) 91 fi 92 85 93 # log4cxx 86 94 AC_CHECK_LIB(log4cxx, [open], [], … … 92 100 AC_C_CONST 93 101 AC_CHECK_FUNCS([clock_gettime fdatasync fsync gettimeofday memset socket strcasecmp strchr strdup]) 94 AC_CHECK_HEADERS([netinet/in.h stdlib.h string.h sys/param.h ])102 AC_CHECK_HEADERS([netinet/in.h stdlib.h string.h sys/param.h cstdlib]) 95 103 AC_C_INLINE 96 104 AC_FUNC_ALLOCA -
trunk/thrucommon/src
- Property svn:ignore
-
old new 7 7 thrucommon.pc 8 8 thrurecorder 9 *.loT
-
- Property svn:ignore
-
trunk/thrucommon/src/Makefile.am
r316 r339 15 15 ConfigFile.h \ 16 16 Hashing.h \ 17 FileLogger.h \ 17 18 memcache++.h \ 18 19 MemcacheHandle.h \ … … 33 34 ConfigFile.cpp \ 34 35 Hashing.cpp \ 36 FileLogger.cpp \ 35 37 ReplicationRecorder.cpp \ 36 38 Spread.cpp \ -
trunk/thrudex
- Property svn:ignore
-
old new 8 8 COPYING 9 9 INSTALL 10 libtool 10 11 Makefile 11 12 Makefile.in
-
- Property svn:ignore
-
trunk/thrudex/configure.in
r325 r339 58 58 fi 59 59 60 # boost filesystem 61 if test "$with_disk" != "no" -o "$with_bdb" != "no"; then 62 AC_CHECK_LIB(boost_filesystem, [open], [], 63 AC_MSG_ERROR([You must install boost_filesystem; please install libraries and development files])) 64 AC_CHECK_HEADERS([boost/filesystem/path.hpp],, 65 AC_MSG_ERROR([boost/filesystem/path.hpp missing; please add db_cxx development files])) 66 fi 67 60 68 # log4cxx 61 69 AC_CHECK_LIB(log4cxx, [open], [], -
trunk/thrudex/src
- Property svn:ignore
-
old new 1 1 .deps 2 2 gen-cpp 3 .libs 3 4 Makefile 4 5 Makefile.in 5 6 *.swp 6 7 thrudex 8 thrudex_bench 9 thrudex_replay
-
- Property svn:ignore
-
trunk/thrudex/src/Makefile.am
r325 r339 1 1 pkgconfigdir = $(libdir)/pkgconfig 2 2 3 bin_PROGRAMS = thrudex thrudex_bench 3 bin_PROGRAMS = thrudex thrudex_bench thrudex_replay 4 4 noinst_PROGRAMS = thrudex_bench 5 5 lib_LTLIBRARIES = libthrudex.la … … 13 13 gen-cpp/Thrudex_types.cpp \ 14 14 gen-cpp/Thrudex_constants.cpp \ 15 app_helpers.cpp \ 15 16 ThrudexHandler.cpp \ 17 LogBackend.cpp \ 16 18 ThrudexBackend.cpp \ 17 19 CLuceneBackend.cpp \ … … 22 24 23 25 thrudex_SOURCES = \ 24 main.cpp26 thrudex.cpp 25 27 26 28 … … 29 31 30 32 LDADD = $(top_builddir)/src/libthrudex.la 33 34 thrudex_replay_CPPFLAGS = -Wall -Igen-cpp $(CURL_CFLAGS) $(MEMCACHED_CFLAGS) $(THRIFTNB_CFLAGS) $(THRUCOMMON_CFLAGS) $(SSL_CFLAGS) $(UUID_CFLAGS) 35 thrudex_replay_LDADD = $(LDADD) 36 thrudex_replay_LDFLAGS = -Wall -rpath $(pkglibdir) $(CURL_LIBS) $(MEMCACHED_LIBS) $(THRIFTNB_LIBS) $(THRUCOMMON_LIBS) $(SSL_LIBS) $(UUID_LIBS) 37 thrudex_replay_SOURCES = \ 38 thrudex_replay.cpp 31 39 32 40 thrudex_bench_SOURCES = \ -
trunk/thrudex/src/thrudex.cpp
r297 r339 35 35 #include "log4cxx/helpers/exception.h" 36 36 37 #include "app_helpers.h" 37 38 #include "ThrudexHandler.h" 38 39 #include "ThrudexBackend.h" … … 83 84 ConfigManager->readFile( conf_file ); 84 85 85 string index_root = ConfigManager->read<string>("INDEX_ROOT");86 PropertyConfigurator::configure(conf_file); 86 87 87 int thread_count = ConfigManager->read<int>("THREAD_COUNT",3); 88 int server_port = ConfigManager->read<int>("SERVER_PORT",9099); 88 int thread_count = ConfigManager->read<int>("THREAD_COUNT",3); 89 int server_port = ConfigManager->read<int>("SERVER_PORT",9099); 90 string which = ConfigManager->read<string>("BACKEND","CLucene"); 89 91 90 PropertyConfigurator::configure(conf_file); 91 setlocale(LC_CTYPE, "en_US.utf8"); //unicode support 92 93 LOG4CXX_INFO(logger, "Starting up"); 94 95 string which = ConfigManager->read<string>("BACKEND","CLucene"); 96 shared_ptr<ThrudexBackend> backend(new CLuceneBackend(index_root)); 97 92 shared_ptr<ThrudexBackend> backend = create_backend (which, 93 thread_count); 98 94 99 95 if (ConfigManager->read<int>("KEEP_STATS", 0)) -
trunk/thrudex/src/thrudex_replay.cpp
r250 r339 9 9 10 10 #ifdef HAVE_CONFIG_H 11 #include "thrud oc_config.h"11 #include "thrudex_config.h" 12 12 #endif 13 13 /* hack to work around thrift and log4cxx installing config.h's */ … … 41 41 #include "ConfigFile.h" 42 42 #include "LogBackend.h" 43 #include "Thrud ocBackend.h"44 #include "Thrud ocHandler.h"45 #include "Thrud ocHandler.h"43 #include "ThrudexBackend.h" 44 #include "ThrudexHandler.h" 45 #include "ThrudexHandler.h" 46 46 #include "ThruFileTransport.h" 47 47 48 48 using namespace boost; 49 using namespace thrud oc;49 using namespace thrudex; 50 50 using namespace facebook::thrift; 51 51 using namespace facebook::thrift::concurrency; … … 57 57 using namespace std; 58 58 59 LoggerPtr logger (Logger::getLogger ("thrud oc_replay"));59 LoggerPtr logger (Logger::getLogger ("thrudex_replay")); 60 60 61 61 //print usage and die 62 62 inline void usage () 63 63 { 64 cerr<<"thrud oc -f /path/to/thrudoc.conf"<<endl;65 cerr<<"\tor create ~/.thrud oc"<<endl;64 cerr<<"thrudex -f /path/to/thrudex.conf"<<endl; 65 cerr<<"\tor create ~/.thrudex"<<endl; 66 66 cerr<<"\t-nb creates non-blocking server"<<endl; 67 67 exit (-1); … … 71 71 { 72 72 public: 73 Replayer (shared_ptr<Thrud ocBackend> backend, string current_filename,73 Replayer (shared_ptr<ThrudexBackend> backend, string current_filename, 74 74 uint32_t delay_seconds) 75 75 { … … 83 83 this->delay_seconds = delay_seconds; 84 84 85 this->handler = shared_ptr<ThrudocHandler>86 (new Thrud ocHandler (this->backend));87 this->processor = shared_ptr<Thrud ocProcessor>88 (new Thrud ocProcessor (this->handler));85 shared_ptr<ThrudexHandler> handler = shared_ptr<ThrudexHandler> 86 (new ThrudexHandler (this->backend)); 87 this->processor = shared_ptr<ThrudexProcessor> 88 (new ThrudexProcessor (handler)); 89 89 90 90 this->last_position_flush = 0; … … 105 105 } 106 106 } 107 catch (Thrud ocException e)107 catch (ThrudexException e) 108 108 { 109 109 LOG4CXX_WARN (logger, "last_position unknown, assuming epoch"); … … 212 212 213 213 TBinaryProtocolFactory protocol_factory; 214 shared_ptr<ThrudocBackend> backend; 215 shared_ptr<ThrudocHandler> handler; 216 shared_ptr<ThrudocProcessor> processor; 214 shared_ptr<ThrudexBackend> backend; 215 shared_ptr<ThrudexProcessor> processor; 217 216 string current_filename; 218 217 int64_t current_position; … … 226 225 { 227 226 228 string conf_file = string (getenv ("HOME"))+"/.thrud oc";227 string conf_file = string (getenv ("HOME"))+"/.thrudex"; 229 228 bool nonblocking = true; 230 229 … … 263 262 else 264 263 { 265 Thrud ocException e;264 ThrudexException e; 266 265 e.what = "error opening log index file"; 267 266 LOG4CXX_ERROR (logger, e.what); … … 271 270 if (log_filename.empty ()) 272 271 { 273 Thrud ocException e;272 ThrudexException e; 274 273 e.what = "error log index file empty"; 275 274 LOG4CXX_ERROR (logger, e.what); … … 282 281 // create our backend 283 282 string which = ConfigManager->read<string> ("BACKEND", "mysql"); 284 shared_ptr<Thrud ocBackend> backend = create_backend (which, 1);283 shared_ptr<ThrudexBackend> backend = create_backend (which, 1); 285 284 286 285 int32_t delay_seconds = -
trunk/thrudoc/src/LogBackend.cpp
r241 r339 45 45 46 46 this->set_backend (backend); 47 this->log_directory = log_directory;48 this->num_ops = 0;49 this->max_ops = max_ops;50 this->sync_wait = sync_wait;51 47 52 48 // create our serializer … … 55 51 msg_client = shared_ptr<ThrudocClient>(new ThrudocClient (msg_protocol)); 56 52 57 // if our log directory doesn't exist, create it 58 if (!fs::is_directory (log_directory)) 59 { 60 try 61 { 62 fs::create_directories (log_directory); 63 } 64 catch (exception e) 65 { 66 LOG4CXX_ERROR (logger, string ("log error: ") + e.what ()); 67 throw e; 68 } 69 } 70 71 // try opening up the index 72 index_file.open (log_directory + "/" + LOG_FILE_PREFIX + "index", ios::in); 73 74 // get the last log file name 75 LOG4CXX_DEBUG (logger, "reading index"); 76 string old_log_filename; 77 while (index_file.good ()) 78 { 79 char row[256]; 80 index_file.getline (row, 256); 81 if (strlen (row)) 82 { 83 LOG4CXX_DEBUG (logger, string (" log file=") + row); 84 old_log_filename = string (row); 85 } 86 } 87 // we're done reading 88 index_file.close (); 89 LOG4CXX_INFO (logger, string ("old_log_filename=") + old_log_filename); 90 91 // our new logfile 92 string new_log_filename = get_log_filename (); 93 LOG4CXX_INFO (logger, string ("new_log_filename=") + new_log_filename); 94 95 // if there's an old log 96 if (!old_log_filename.empty ()) 97 { 98 // open the old log 99 open_log_client (old_log_filename, true); 100 // write a nextLog of the new logfile so that replayers can chain 101 send_nextLog (new_log_filename); 102 } 103 // then open the new log 104 open_log_client (new_log_filename, false); 105 106 // write the new logfile to the index 107 index_file.open (log_directory + "/" + LOG_FILE_PREFIX + "index", 108 ios::out | ios::app); 109 if (!index_file.is_open ()) 110 { 111 ThrudocException e; 112 e.what = "error opening log index file"; 113 LOG4CXX_ERROR (logger, e.what); 114 throw e; 115 } 116 117 index_file.write ((new_log_filename + "\n").c_str (), 118 new_log_filename.length () + 1); 119 120 // make sure our addition makes it to disk 121 index_file.flush (); 122 } 123 124 LogBackend::~LogBackend () 125 { 126 log_transport->close (); 127 index_file.close (); 53 file_logger = new FileLogger(log_directory, LOG_FILE_PREFIX, max_ops, 54 sync_wait); 128 55 } 129 56 130 57 void LogBackend::put (const string & bucket, const string & key, 131 const string & value)58 const string & value) 132 59 { 133 60 this->get_backend ()->put (bucket, key, value); … … 140 67 msg_transport->resetBuffer (); 141 68 142 send_log (raw_msg);69 file_logger->send_log (raw_msg); 143 70 } 144 71 catch (TException e) … … 163 90 msg_transport->resetBuffer (); 164 91 165 send_log (raw_msg);92 file_logger->send_log (raw_msg); 166 93 } 167 94 catch (TException e) … … 179 106 if (op == "roll_log") 180 107 { 181 this->roll_log ();108 file_logger->roll_log (); 182 109 return "done"; 183 110 } … … 193 120 msg_transport->resetBuffer (); 194 121 195 send_log (raw_msg);122 file_logger->send_log (raw_msg); 196 123 } 197 124 catch (TException e) … … 219 146 msg_transport->resetBuffer (); 220 147 221 send_log (raw_msg);148 file_logger->send_log (raw_msg); 222 149 } 223 150 catch (TException e) … … 244 171 msg_transport->resetBuffer (); 245 172 246 send_log (raw_msg);173 file_logger->send_log (raw_msg); 247 174 } 248 175 catch (TException e) … … 257 184 return ret; 258 185 } 259 260 string LogBackend::get_log_filename ()261 {262 char buf[64];263 sprintf (buf, "%s%d", LOG_FILE_PREFIX, (int)time (NULL));264 return buf;265 }266 267 void LogBackend::open_log_client (string log_filename, bool imediate_sync)268 {269 LOG4CXX_INFO (logger, "open_log_client: log_filename=" + log_filename +270 ", imediate_sync=" + (imediate_sync ? "true" : "false"));271 272 // flush old log file273 if (log_transport)274 log_transport->flush ();275 276 // and open up a new one277 log_transport = shared_ptr<ThruFileWriterTransport>278 (new ThruFileWriterTransport (log_directory + "/" + log_filename,279 imediate_sync ? 0 : this->sync_wait));280 shared_ptr<TProtocol> log_protocol (new TBinaryProtocol (log_transport));281 log_client = shared_ptr<EventLogClient> (new EventLogClient (log_protocol));282 }283 284 186 285 187 Event LogBackend::create_event (const string & message) … … 309 211 } 310 212 311 void LogBackend::send_nextLog (string new_log_filename)312 {313 log_client->send_nextLog (new_log_filename);314 }315 316 void LogBackend::send_log (string raw_message)317 {318 log_client->send_log (this->create_event (raw_message));319 320 // this is going to be fuzzy b/c of multi-thread, but that's ok321 this->num_ops++;322 323 if (this->num_ops >= this->max_ops)324 {325 Guard g(log_mutex);326 if (this->num_ops >= this->max_ops)327 {328 this->roll_log ();329 this->num_ops = 0;330 }331 }332 }333 334 void LogBackend::roll_log ()335 {336 string new_log_filename = get_log_filename ();337 LOG4CXX_INFO (logger, "roll_log: new logfile=" + new_log_filename);338 339 // time for a new file340 // point to it in the old one341 send_nextLog (new_log_filename);342 // and open the new one343 open_log_client (new_log_filename, false);344 // add it to the index345 index_file.write ((new_log_filename + "\n").c_str (),346 new_log_filename.length () + 1);347 index_file.flush ();348 }349 350 213 #endif /* HAVE_LIBBOOST_FILESYSTEM */ -
trunk/thrudoc/src/LogBackend.h
r241 r339 22 22 23 23 #include "ThrudocPassthruBackend.h" 24 #include "EventLog.h" 25 #include "ThruFileTransport.h" 24 25 #include <EventLog.h> 26 #include <FileLogger.h> 26 27 27 28 #define LOG_FILE_PREFIX "thrudoc-log." … … 33 34 const std::string &log_directory, unsigned int max_ops, 34 35 unsigned int sync_wait); 35 ~LogBackend ();36 36 37 37 void put (const std::string & bucket, const std::string & key, … … 53 53 boost::shared_ptr<facebook::thrift::transport::TMemoryBuffer> msg_transport; 54 54 boost::shared_ptr<thrudoc::ThrudocClient> msg_client; 55 Event create_event (const std::string &msg); 55 56 56 // this will be used to write to the log file 57 boost::shared_ptr<ThruFileWriterTransport> log_transport; 58 boost::shared_ptr<EventLogClient> log_client; 59 60 facebook::thrift::concurrency::Mutex log_mutex; 61 std::string log_directory; 62 boost::filesystem::fstream index_file; 63 unsigned int num_ops; 64 unsigned int max_ops; 65 unsigned int sync_wait; 66 67 std::string get_log_filename (); 68 void open_log_client (std::string log_filename, bool imediate_sync); 69 void roll_log (); 70 Event create_event (const std::string &msg); 71 void send_log (std::string raw_message); 72 void send_nextLog (std::string new_log_filename); 57 FileLogger * file_logger; 73 58 }; 74 59 -
trunk/thrudoc/src/thrudoc.cpp
r218 r339 17 17 #include <concurrency/PosixThreadFactory.h> 18 18 #include <protocol/TBinaryProtocol.h> 19 #include <protocol/TCountingProtocol.h> 19 20 #include <server/TSimpleServer.h> 20 21 #include <server/TThreadPoolServer.h> … … 93 94 //Startup Services 94 95 shared_ptr<TProtocolFactory> 95 protocolFactory (new TBinaryProtocolFactory ()); 96 protF (new TBinaryProtocolFactory ()); 97 shared_ptr<TProtocolFactory> 98 protocolFactory (new TCountingProtocolFactory (protF)); 96 99 97 100 string which = ConfigManager->read<string> ("BACKEND", "mysql"); -
trunk/thrudoc/src/thrudoc_replay.cpp
r250 r339 83 83 this->delay_seconds = delay_seconds; 84 84 85 this->handler = shared_ptr<ThrudocHandler>85 shared_ptr<ThrudocHandler> handler = shared_ptr<ThrudocHandler> 86 86 (new ThrudocHandler (this->backend)); 87 87 this->processor = shared_ptr<ThrudocProcessor> 88 (new ThrudocProcessor ( this->handler));88 (new ThrudocProcessor (handler)); 89 89 90 90 this->last_position_flush = 0; … … 213 213 TBinaryProtocolFactory protocol_factory; 214 214 shared_ptr<ThrudocBackend> backend; 215 shared_ptr<ThrudocHandler> handler;216 215 shared_ptr<ThrudocProcessor> processor; 217 216 string current_filename;
