Changeset 339

Show
Ignore:
Timestamp:
04/04/08 01:31:30 (8 months ago)
Author:
rm
Message:

move common file logging code to thrucommon (replay needs a pass) and give thrudex replay support

Location:
trunk
Files:
6 added
11 modified
1 copied
1 moved

Legend:

Unmodified
Added
Removed
  • trunk/thrucommon/configure.in

    r304 r339  
    8383fi 
    8484 
     85# boost filesystem 
     86if test "$with_disk" != "no" -o "$with_bdb" != "no"; then 
     87    AC_CHECK_LIB(boost_filesystem, [open], [], 
     88                 AC_MSG_ERROR([You must install boost_filesystem; please install libraries and development files])) 
     89    AC_CHECK_HEADERS([boost/filesystem/path.hpp],, 
     90                     AC_MSG_ERROR([boost/filesystem/path.hpp missing; please add db_cxx development files])) 
     91fi 
     92 
    8593# log4cxx 
    8694AC_CHECK_LIB(log4cxx, [open], [], 
     
    92100AC_C_CONST 
    93101AC_CHECK_FUNCS([clock_gettime fdatasync fsync gettimeofday memset socket strcasecmp strchr strdup]) 
    94 AC_CHECK_HEADERS([netinet/in.h stdlib.h string.h sys/param.h]) 
     102AC_CHECK_HEADERS([netinet/in.h stdlib.h string.h sys/param.h cstdlib]) 
    95103AC_C_INLINE 
    96104AC_FUNC_ALLOCA 
  • trunk/thrucommon/src

    • Property svn:ignore
      •  

        old new  
        77thrucommon.pc 
        88thrurecorder 
         9*.loT 
  • trunk/thrucommon/src/Makefile.am

    r316 r339  
    1515                             ConfigFile.h               \ 
    1616                             Hashing.h                  \ 
     17                             FileLogger.h               \ 
    1718                             memcache++.h               \ 
    1819                             MemcacheHandle.h           \ 
     
    3334                           ConfigFile.cpp                       \ 
    3435                           Hashing.cpp                          \ 
     36                           FileLogger.cpp                       \ 
    3537                           ReplicationRecorder.cpp              \ 
    3638                           Spread.cpp                           \ 
  • trunk/thrudex

    • Property svn:ignore
      •  

        old new  
        88COPYING 
        99INSTALL 
         10libtool 
        1011Makefile 
        1112Makefile.in 
  • trunk/thrudex/configure.in

    r325 r339  
    5858fi 
    5959 
     60# boost filesystem 
     61if test "$with_disk" != "no" -o "$with_bdb" != "no"; then 
     62    AC_CHECK_LIB(boost_filesystem, [open], [], 
     63                 AC_MSG_ERROR([You must install boost_filesystem; please install libraries and development files])) 
     64    AC_CHECK_HEADERS([boost/filesystem/path.hpp],, 
     65                     AC_MSG_ERROR([boost/filesystem/path.hpp missing; please add db_cxx development files])) 
     66fi 
     67 
    6068# log4cxx 
    6169AC_CHECK_LIB(log4cxx, [open], [], 
  • trunk/thrudex/src

    • Property svn:ignore
      •  

        old new  
        11.deps 
        22gen-cpp 
         3.libs 
        34Makefile 
        45Makefile.in 
        56*.swp 
        67thrudex 
         8thrudex_bench 
         9thrudex_replay 
  • trunk/thrudex/src/Makefile.am

    r325 r339  
    11pkgconfigdir = $(libdir)/pkgconfig 
    22 
    3 bin_PROGRAMS = thrudex thrudex_bench 
     3bin_PROGRAMS = thrudex thrudex_bench thrudex_replay 
    44noinst_PROGRAMS = thrudex_bench 
    55lib_LTLIBRARIES = libthrudex.la 
     
    1313                  gen-cpp/Thrudex_types.cpp             \ 
    1414                  gen-cpp/Thrudex_constants.cpp         \ 
     15                  app_helpers.cpp                       \ 
    1516                  ThrudexHandler.cpp                    \ 
     17                  LogBackend.cpp                        \ 
    1618                  ThrudexBackend.cpp                    \ 
    1719                  CLuceneBackend.cpp                    \ 
     
    2224 
    2325thrudex_SOURCES = \ 
    24                   main.cpp 
     26                  thrudex.cpp 
    2527 
    2628 
     
    2931 
    3032LDADD = $(top_builddir)/src/libthrudex.la 
     33 
     34thrudex_replay_CPPFLAGS = -Wall -Igen-cpp $(CURL_CFLAGS) $(MEMCACHED_CFLAGS) $(THRIFTNB_CFLAGS) $(THRUCOMMON_CFLAGS) $(SSL_CFLAGS) $(UUID_CFLAGS) 
     35thrudex_replay_LDADD = $(LDADD) 
     36thrudex_replay_LDFLAGS = -Wall -rpath $(pkglibdir) $(CURL_LIBS) $(MEMCACHED_LIBS) $(THRIFTNB_LIBS) $(THRUCOMMON_LIBS) $(SSL_LIBS) $(UUID_LIBS)  
     37thrudex_replay_SOURCES = \ 
     38                  thrudex_replay.cpp 
    3139 
    3240thrudex_bench_SOURCES = \ 
  • trunk/thrudex/src/thrudex.cpp

    r297 r339  
    3535#include "log4cxx/helpers/exception.h" 
    3636 
     37#include "app_helpers.h" 
    3738#include "ThrudexHandler.h" 
    3839#include "ThrudexBackend.h" 
     
    8384        ConfigManager->readFile( conf_file ); 
    8485 
    85         string index_root   = ConfigManager->read<string>("INDEX_ROOT"); 
     86        PropertyConfigurator::configure(conf_file); 
    8687 
    87         int    thread_count = ConfigManager->read<int>("THREAD_COUNT",3); 
    88         int    server_port  = ConfigManager->read<int>("SERVER_PORT",9099); 
     88        int thread_count = ConfigManager->read<int>("THREAD_COUNT",3); 
     89        int server_port  = ConfigManager->read<int>("SERVER_PORT",9099); 
     90        string which = ConfigManager->read<string>("BACKEND","CLucene"); 
    8991 
    90         PropertyConfigurator::configure(conf_file); 
    91         setlocale(LC_CTYPE, "en_US.utf8");  //unicode support 
    92  
    93         LOG4CXX_INFO(logger, "Starting up"); 
    94  
    95         string which      = ConfigManager->read<string>("BACKEND","CLucene"); 
    96         shared_ptr<ThrudexBackend>    backend(new CLuceneBackend(index_root)); 
    97  
     92        shared_ptr<ThrudexBackend> backend = create_backend (which,  
     93                                                             thread_count); 
    9894 
    9995        if (ConfigManager->read<int>("KEEP_STATS", 0)) 
  • trunk/thrudex/src/thrudex_replay.cpp

    r250 r339  
    99 
    1010#ifdef HAVE_CONFIG_H 
    11 #include "thrudoc_config.h" 
     11#include "thrudex_config.h" 
    1212#endif 
    1313/* hack to work around thrift and log4cxx installing config.h's */ 
     
    4141#include "ConfigFile.h" 
    4242#include "LogBackend.h" 
    43 #include "ThrudocBackend.h" 
    44 #include "ThrudocHandler.h" 
    45 #include "ThrudocHandler.h" 
     43#include "ThrudexBackend.h" 
     44#include "ThrudexHandler.h" 
     45#include "ThrudexHandler.h" 
    4646#include "ThruFileTransport.h" 
    4747 
    4848using namespace boost; 
    49 using namespace thrudoc; 
     49using namespace thrudex; 
    5050using namespace facebook::thrift; 
    5151using namespace facebook::thrift::concurrency; 
     
    5757using namespace std; 
    5858 
    59 LoggerPtr logger (Logger::getLogger ("thrudoc_replay")); 
     59LoggerPtr logger (Logger::getLogger ("thrudex_replay")); 
    6060 
    6161//print usage and die 
    6262inline void usage () 
    6363{ 
    64     cerr<<"thrudoc -f /path/to/thrudoc.conf"<<endl; 
    65     cerr<<"\tor create ~/.thrudoc"<<endl; 
     64    cerr<<"thrudex -f /path/to/thrudex.conf"<<endl; 
     65    cerr<<"\tor create ~/.thrudex"<<endl; 
    6666    cerr<<"\t-nb creates non-blocking server"<<endl; 
    6767    exit (-1); 
     
    7171{ 
    7272    public: 
    73         Replayer (shared_ptr<ThrudocBackend> backend, string current_filename, 
     73        Replayer (shared_ptr<ThrudexBackend> backend, string current_filename, 
    7474                  uint32_t delay_seconds)  
    7575        { 
     
    8383            this->delay_seconds = delay_seconds; 
    8484 
    85             this->handler = shared_ptr<ThrudocHandler> 
    86                 (new ThrudocHandler (this->backend)); 
    87             this->processor = shared_ptr<ThrudocProcessor> 
    88                 (new ThrudocProcessor (this->handler)); 
     85            shared_ptr<ThrudexHandler> handler = shared_ptr<ThrudexHandler> 
     86                (new ThrudexHandler (this->backend)); 
     87            this->processor = shared_ptr<ThrudexProcessor> 
     88                (new ThrudexProcessor (handler)); 
    8989 
    9090            this->last_position_flush = 0; 
     
    105105                } 
    106106            } 
    107             catch (ThrudocException e) 
     107            catch (ThrudexException e) 
    108108            { 
    109109                LOG4CXX_WARN (logger, "last_position unknown, assuming epoch"); 
     
    212212 
    213213        TBinaryProtocolFactory protocol_factory; 
    214         shared_ptr<ThrudocBackend> backend; 
    215         shared_ptr<ThrudocHandler> handler; 
    216         shared_ptr<ThrudocProcessor> processor; 
     214        shared_ptr<ThrudexBackend> backend; 
     215        shared_ptr<ThrudexProcessor> processor; 
    217216        string current_filename; 
    218217        int64_t current_position; 
     
    226225{ 
    227226 
    228     string conf_file = string (getenv ("HOME"))+"/.thrudoc"; 
     227    string conf_file = string (getenv ("HOME"))+"/.thrudex"; 
    229228    bool nonblocking = true; 
    230229 
     
    263262        else  
    264263        { 
    265             ThrudocException e; 
     264            ThrudexException e; 
    266265            e.what = "error opening log index file"; 
    267266            LOG4CXX_ERROR (logger, e.what); 
     
    271270        if (log_filename.empty ()) 
    272271        { 
    273             ThrudocException e; 
     272            ThrudexException e; 
    274273            e.what = "error log index file empty"; 
    275274            LOG4CXX_ERROR (logger, e.what); 
     
    282281        // create our backend 
    283282        string which = ConfigManager->read<string> ("BACKEND", "mysql"); 
    284         shared_ptr<ThrudocBackend> backend = create_backend (which, 1); 
     283        shared_ptr<ThrudexBackend> backend = create_backend (which, 1); 
    285284 
    286285        int32_t delay_seconds =  
  • trunk/thrudoc/src/LogBackend.cpp

    r241 r339  
    4545 
    4646    this->set_backend (backend); 
    47     this->log_directory = log_directory; 
    48     this->num_ops = 0; 
    49     this->max_ops = max_ops; 
    50     this->sync_wait = sync_wait; 
    5147 
    5248    // create our serializer 
     
    5551    msg_client = shared_ptr<ThrudocClient>(new ThrudocClient (msg_protocol)); 
    5652 
    57     // if our log directory doesn't exist, create it 
    58     if (!fs::is_directory (log_directory)) 
    59     { 
    60         try 
    61         { 
    62             fs::create_directories (log_directory); 
    63         } 
    64         catch (exception e) 
    65         { 
    66             LOG4CXX_ERROR (logger, string ("log error: ") + e.what ()); 
    67             throw e; 
    68         } 
    69     } 
    70  
    71     // try opening up the index 
    72     index_file.open (log_directory + "/" + LOG_FILE_PREFIX + "index", ios::in); 
    73  
    74     // get the last log file name 
    75     LOG4CXX_DEBUG (logger, "reading index"); 
    76     string old_log_filename; 
    77     while (index_file.good ()) 
    78     { 
    79         char row[256]; 
    80         index_file.getline (row, 256); 
    81         if (strlen (row)) 
    82         { 
    83             LOG4CXX_DEBUG (logger, string ("    log file=") + row); 
    84             old_log_filename = string (row); 
    85         } 
    86     } 
    87     // we're done reading 
    88     index_file.close (); 
    89     LOG4CXX_INFO (logger, string ("old_log_filename=") + old_log_filename); 
    90  
    91     // our new logfile 
    92     string new_log_filename = get_log_filename (); 
    93     LOG4CXX_INFO (logger, string ("new_log_filename=") + new_log_filename); 
    94  
    95     // if there's an old log 
    96     if (!old_log_filename.empty ()) 
    97     { 
    98         // open the old log 
    99         open_log_client (old_log_filename, true); 
    100         // write a nextLog of the new logfile so that replayers can chain 
    101         send_nextLog (new_log_filename); 
    102     } 
    103     // then open the new log 
    104     open_log_client (new_log_filename, false); 
    105  
    106     // write the new logfile to the index 
    107     index_file.open (log_directory + "/" + LOG_FILE_PREFIX + "index",  
    108                          ios::out | ios::app); 
    109     if (!index_file.is_open ()) 
    110     { 
    111         ThrudocException e; 
    112         e.what = "error opening log index file"; 
    113         LOG4CXX_ERROR (logger, e.what); 
    114         throw e; 
    115     } 
    116  
    117     index_file.write ((new_log_filename + "\n").c_str (),  
    118                       new_log_filename.length () + 1); 
    119  
    120     // make sure our addition makes it to disk 
    121     index_file.flush (); 
    122 } 
    123  
    124 LogBackend::~LogBackend () 
    125 { 
    126     log_transport->close (); 
    127     index_file.close (); 
     53    file_logger = new FileLogger(log_directory, LOG_FILE_PREFIX, max_ops,  
     54                                 sync_wait); 
    12855} 
    12956 
    13057void LogBackend::put (const string & bucket, const string & key, 
    131           const string & value) 
     58                      const string & value) 
    13259{ 
    13360    this->get_backend ()->put (bucket, key, value); 
     
    14067        msg_transport->resetBuffer (); 
    14168 
    142         send_log (raw_msg); 
     69        file_logger->send_log (raw_msg); 
    14370    } 
    14471    catch (TException e) 
     
    16390        msg_transport->resetBuffer (); 
    16491 
    165         send_log (raw_msg); 
     92        file_logger->send_log (raw_msg); 
    16693    } 
    16794    catch (TException e) 
     
    179106    if (op == "roll_log") 
    180107    { 
    181         this->roll_log (); 
     108        file_logger->roll_log (); 
    182109        return "done"; 
    183110    } 
     
    193120            msg_transport->resetBuffer (); 
    194121 
    195             send_log (raw_msg); 
     122            file_logger->send_log (raw_msg); 
    196123        } 
    197124        catch (TException e) 
     
    219146        msg_transport->resetBuffer (); 
    220147 
    221         send_log (raw_msg); 
     148        file_logger->send_log (raw_msg); 
    222149    } 
    223150    catch (TException e) 
     
    244171        msg_transport->resetBuffer (); 
    245172 
    246         send_log (raw_msg); 
     173        file_logger->send_log (raw_msg); 
    247174    } 
    248175    catch (TException e) 
     
    257184    return ret; 
    258185} 
    259  
    260 string LogBackend::get_log_filename () 
    261 { 
    262     char buf[64]; 
    263     sprintf (buf, "%s%d", LOG_FILE_PREFIX, (int)time (NULL)); 
    264     return buf;  
    265 } 
    266  
    267 void LogBackend::open_log_client (string log_filename, bool imediate_sync) 
    268 { 
    269     LOG4CXX_INFO (logger, "open_log_client: log_filename=" + log_filename +  
    270                   ", imediate_sync=" + (imediate_sync ? "true" : "false")); 
    271  
    272     // flush old log file 
    273     if (log_transport) 
    274         log_transport->flush (); 
    275  
    276     // and open up a new one 
    277     log_transport = shared_ptr<ThruFileWriterTransport> 
    278         (new ThruFileWriterTransport (log_directory + "/" + log_filename,  
    279                                       imediate_sync ? 0 : this->sync_wait)); 
    280     shared_ptr<TProtocol> log_protocol (new TBinaryProtocol (log_transport)); 
    281     log_client = shared_ptr<EventLogClient> (new EventLogClient (log_protocol)); 
    282 } 
    283  
    284186 
    285187Event LogBackend::create_event (const string & message) 
     
    309211} 
    310212 
    311 void LogBackend::send_nextLog (string new_log_filename) 
    312 { 
    313     log_client->send_nextLog (new_log_filename); 
    314 } 
    315  
    316 void LogBackend::send_log (string raw_message) 
    317 { 
    318     log_client->send_log (this->create_event (raw_message)); 
    319  
    320     // this is going to be fuzzy b/c of multi-thread, but that's ok 
    321     this->num_ops++; 
    322  
    323     if (this->num_ops >= this->max_ops) 
    324     { 
    325         Guard g(log_mutex);  
    326         if (this->num_ops >= this->max_ops) 
    327         {    
    328             this->roll_log (); 
    329             this->num_ops = 0; 
    330         } 
    331     } 
    332 } 
    333  
    334 void LogBackend::roll_log () 
    335 { 
    336     string new_log_filename = get_log_filename (); 
    337     LOG4CXX_INFO (logger, "roll_log: new logfile=" + new_log_filename); 
    338  
    339     // time for a new file 
    340     // point to it in the old one 
    341     send_nextLog (new_log_filename); 
    342     // and open the new one 
    343     open_log_client (new_log_filename, false); 
    344     // add it to the index 
    345     index_file.write ((new_log_filename + "\n").c_str (),  
    346                       new_log_filename.length () + 1); 
    347     index_file.flush (); 
    348 } 
    349  
    350213#endif /* HAVE_LIBBOOST_FILESYSTEM */ 
  • trunk/thrudoc/src/LogBackend.h

    r241 r339  
    2222 
    2323#include "ThrudocPassthruBackend.h" 
    24 #include "EventLog.h" 
    25 #include "ThruFileTransport.h" 
     24 
     25#include <EventLog.h> 
     26#include <FileLogger.h> 
    2627 
    2728#define LOG_FILE_PREFIX "thrudoc-log." 
     
    3334                    const std::string &log_directory, unsigned int max_ops, 
    3435                    unsigned int sync_wait); 
    35         ~LogBackend (); 
    3636 
    3737        void put (const std::string & bucket, const std::string & key, 
     
    5353        boost::shared_ptr<facebook::thrift::transport::TMemoryBuffer> msg_transport; 
    5454        boost::shared_ptr<thrudoc::ThrudocClient> msg_client; 
     55        Event create_event (const std::string &msg); 
    5556 
    56         // this will be used to write to the log file 
    57         boost::shared_ptr<ThruFileWriterTransport> log_transport; 
    58         boost::shared_ptr<EventLogClient> log_client; 
    59  
    60         facebook::thrift::concurrency::Mutex log_mutex; 
    61         std::string log_directory; 
    62         boost::filesystem::fstream index_file; 
    63         unsigned int num_ops; 
    64         unsigned int max_ops; 
    65         unsigned int sync_wait; 
    66  
    67         std::string get_log_filename (); 
    68         void open_log_client (std::string log_filename, bool imediate_sync); 
    69         void roll_log (); 
    70         Event create_event (const std::string &msg); 
    71         void send_log (std::string raw_message); 
    72         void send_nextLog (std::string new_log_filename); 
     57        FileLogger * file_logger; 
    7358}; 
    7459 
  • trunk/thrudoc/src/thrudoc.cpp

    r218 r339  
    1717#include <concurrency/PosixThreadFactory.h> 
    1818#include <protocol/TBinaryProtocol.h> 
     19#include <protocol/TCountingProtocol.h> 
    1920#include <server/TSimpleServer.h> 
    2021#include <server/TThreadPoolServer.h> 
     
    9394        //Startup Services 
    9495        shared_ptr<TProtocolFactory> 
    95             protocolFactory (new TBinaryProtocolFactory ()); 
     96            protF (new TBinaryProtocolFactory ()); 
     97        shared_ptr<TProtocolFactory> 
     98            protocolFactory (new TCountingProtocolFactory (protF)); 
    9699 
    97100        string which = ConfigManager->read<string> ("BACKEND", "mysql"); 
  • trunk/thrudoc/src/thrudoc_replay.cpp

    r250 r339  
    8383            this->delay_seconds = delay_seconds; 
    8484 
    85             this->handler = shared_ptr<ThrudocHandler> 
     85            shared_ptr<ThrudocHandler> handler = shared_ptr<ThrudocHandler> 
    8686                (new ThrudocHandler (this->backend)); 
    8787            this->processor = shared_ptr<ThrudocProcessor> 
    88                 (new ThrudocProcessor (this->handler)); 
     88                (new ThrudocProcessor (handler)); 
    8989 
    9090            this->last_position_flush = 0; 
     
    213213        TBinaryProtocolFactory protocol_factory; 
    214214        shared_ptr<ThrudocBackend> backend; 
    215         shared_ptr<ThrudocHandler> handler; 
    216215        shared_ptr<ThrudocProcessor> processor; 
    217216        string current_filename;