|
hce-node application
1.4.3
HCE Hierarchical Cluster Engine node application
|
#include <DataReducerProxy.hpp>


Public Member Functions | |
| DataReducerProxy (std::string name, zmq::context_t &ctx, const std::string &clientIdentity, unsigned char nodeMode, int64_t startedAt, unsigned char logPriority, const std::string &iniFile) | |
| ~DataReducerProxy () | |
Public Member Functions inherited from HCE::handlers::Handler | |
| Handler (const std::string &name, zmq::context_t &context, const std::string &clientIdentity, const std::string &connectionString, int64_t startedAt, unsigned char logPriority, const std::string &iniFile="") | |
| virtual | ~Handler () |
| void | shutdown (void) |
| Get handler instance name. | |
| std::string | getName (void) |
Public Attributes | |
| const int | TTL_TASKS_CLEANUP_INTERVAL_DEFAULT = 10000 |
| const int | TTL_TASKS_CLEANUP_NUMBER_DEFAULT = 100 |
| const int | TTL_TASKS_QUEUE_CLEANUP_NUMBER_DEFAULT = 1000 |
| const int | TTL_TASKS_QUEUE_CLEANUP_TIME = 200 |
Protected Member Functions | |
| int | initialize (void) |
| void | deinitialize (void) |
| void * | process (void) |
| void | handleInternalMessage (void) |
| std::string | processAdminCommands (std::string &command, std::string ¶meters) |
| Reducer FO initialization | |
| void | reducerInit (void) |
| Plain text reducer part. Creates response item to accumulate all responses from registered replica nodes. | |
| void | defaultReducerResponseItemCreate (std::string &messageId, std::string &messageBody) |
| Plain text reducer part. Accumulates response messages from all registered replica nodes in corresponded response item according with message Id. | |
| std::string | defaultReducerResponseItemReduce (std::map< std::string, DefaultReducerResponseItem * >::iterator &DefaultReducerResponseItemIterator) |
| Plain text messages reducer. Process plain text messages for reduce task. | |
| void | defaultReducerReducePlainText (std::string &messageId, std::string &messageBody) |
| Structured messages reducer. Process Json-based messages like Sphinx search results for reduce task. | |
| void | defaultReducerReduceJson (std::string &messageId, std::string &messageBody) |
| Structured messages reducer involved extended reducer FO. Process Json-based messages like Sphinx search results for reduce task. | |
| void | reduceJson (std::string &messageId, std::string &messageBody) |
| TTL tasks queue cleanup for reduceJson. | |
| void | reducerTasksCleanupByTtl (void) |
| Get property information. | |
| std::string | getPropertyInformation (void) |
| ZMQ sockets. | |
| void | setStatRequests (unsigned int statRequests) |
| unsigned int | getStatRequests (void) |
| void | setStatResponses (unsigned int statResponses) |
| unsigned int | getStatResponses (void) |
| Number of connected and registered clients. | |
| void | setClientsNumber (unsigned int clientsNumber) |
| unsigned int | getClientsNumber (void) |
| Node mode: router, shard manager, replica manager or replica. | |
| void | setNodeMode (unsigned char nodeMode) |
| unsigned char | getNodeMode (void) |
| Time when ttl terminated tasks queue will be cleaned partially. | |
Protected Member Functions inherited from HCE::handlers::Handler | |
| void | setName (const std::string &name) |
| <Set handler instance name | |
| void | setClientIdentity (const std::string &clientIdentity) |
| Get client ID. | |
| std::string | getClientIdentity (void) |
| Set start time mark. | |
| void | setStartedAt (int64_t startedAt) |
| Get start time mark. | |
| int64_t | getStartedAt (void) |
| Set log priority level. | |
| void | setLogPriority (unsigned int logPriority) |
| Get log priority level. | |
| unsigned int | getLogPriority (void) |
| Set ini file name. | |
| void | setIniFile (const std::string &iniFile) |
| Get ini file name. | |
| std::string | getIniFile (void) |
| Set ZMQ connections rebuild tries max number. | |
| void | setBindTriesMaxNumber (const unsigned int &bindTriesMaxNumber) |
| Get ZMQ connections rebuild tries max number. | |
| unsigned int | getBindTriesMaxNumber (void) |
| Set ZMQ connections rebuild tries delay. | |
| void | setBindTriesDelay (const unsigned int &bindTriesDelay) |
| Get ZMQ connections rebuild tries delay. | |
| unsigned int | getBindTriesDelay (void) |
| Set ZMQ connection string. | |
| void | setConnectionString (const std::string &connectionString) |
| Get ZMQ connection string. | |
| std::string | getConnectionString (void) |
| Set flag for polling condition. | |
| void | setInProgress (bool in_progress) |
| Get flag for polling condition. | |
| bool | getInProgress (void) const |
| Set poll timeout value. | |
| void | setPollTimeout (unsigned int pollTimeout) |
| Get poll timeout value. | |
| unsigned int | getPollTimeout (void) |
| Set property interval between resend Property values. | |
| void | setPropertyInterval (unsigned int propertyInterval) |
| Get property interval between resend Property values. | |
| unsigned int | getPropertyInterval (void) |
| Set interval between dump run-time variables. | |
| void | setDumpInterval (unsigned int dumpInterval) |
| Get interval between dump run-time variables. | |
| unsigned int | getDumpInterval (void) |
| Set flag for allowed restore variables use dump. | |
| void | setDumpAllowRestore (unsigned int dumpAllowRestore) |
| Get interval between dump run-time variables. | |
| unsigned int | getDumpAllowRestore (void) |
| void | logFlush (unsigned int priority=NODE_LOG_MODE_DEFAULT) |
| Node init from file. | |
| void | initHandler (const std::string &iniFile, LoggerStream &logger) |
| void | handleAdminMessage (void) |
| void | processAdminMessage (zmsg &msg) |
| void | rebuildAdminClientSocket (void) |
| bool | rebuildServerConnection (zmq::socket_t *&sock_p, const std::string &connectionString, const int type) |
| void | rebuildClientConnection (zmq::socket_t *&sock_p, const std::string &connectionString, const int type, const std::string &identity="") |
| bool | rebuildClientConnection (zmq::socket_t *&sock_p, const std::string &connectionString, const int type, const std::string &identity, int linger, const std::string &readyMessage="", const std::string &byeMessage=NODE_MSG_BYE) |
| void | sendByeMessage (zmq::socket_t *&sock_p, const std::string &identity="", const std::string &byeMessage=NODE_MSG_BYE) |
| std::string | getProperties (void) |
| Get property information. | |
| void | sendPeriodicProperties (void) |
| Safe create directories if necessary. | |
| void | createDirectories (const std::string &name) |
| Get file name use for dump and restore properties values. | |
| std::string | getDumpFileName (void) |
| Get file name use for dump and restore statistic values. | |
| std::string | getStatFileName (void) |
| Dump properties variables. | |
| bool | dumpProperties (void) |
| Dump run-time variables at regular intervals. | |
| bool | dumpRunTimeVariables (void) |
| void | handleExternalMessage (void) |
| void | processControlMessage (zmsg &msg) |
| void | processDataMessage (zmsg &msg) |
| void | logPeriodicStatistics (void) |
| void | heartbit (void) |
| void | registerClient (const std::string &identity) |
| Admin socket. | |
Static Protected Member Functions | |
| static void | defaultReducerResponseItemAccumulate (std::map< std::string, DefaultReducerResponseItem * >::iterator &DefaultReducerResponseItemIterator, std::string &messageBody) |
| Plain text reducer part. Reduces data accumulated. Removes duplicates and sorts. | |
Protected Attributes | |
| zmq::socket_t * | _inprocReducerInSock |
| zmq::socket_t * | _inprocReducerOutSock |
| Statistical variables. | |
| int64_t | _reducerTtlTasksCleanupAt |
| HCE::reduce::ReducerFunctionalObject * | _extendedReducerFO |
| std::map< std::string, DefaultReducerResponseItem * > | _responses |
| <Accumulator map for plain text messages | |
Protected Attributes inherited from HCE::handlers::Handler | |
| zmq::socket_t * | _inprocAdminSock |
| Storage for run-time property variables. | |
| HandlerProperties | handlerProperties |
| Storage for run-time statistic variables. | |
| HandlerProperties | statProperties |
| ZMQ context. | |
| zmq::context_t & | _context |
| std::bitset< 4 > | _logMask |
| std::string | _logTimeFormat |
| std::stringstream | log |
| LoggerStream | logger |
| Main loop of sockets polling condition. | |
| bool | _inProgress |
| Send out periodical property values to Admin class at regular intervals. | |
| int64_t | _propertyIntervalAt |
| Dump run-time variables at regular intervals. | |
| int64_t | _dumpIntervalAt |
| Dump dir name. | |
| std::string | _dumpDir |
Additional Inherited Members | |
Static Public Member Functions inherited from HCE::handlers::Handler | |
| static void * | main (void *that) |
Definition at line 40 of file DataReducerProxy.hpp.
| HCE::handlers::DataReducerProxy::DataReducerProxy | ( | std::string | name, |
| zmq::context_t & | ctx, | ||
| const std::string & | clientIdentity, | ||
| unsigned char | nodeMode, | ||
| int64_t | startedAt, | ||
| unsigned char | logPriority, | ||
| const std::string & | iniFile | ||
| ) |
| HCE::handlers::DataReducerProxy::~DataReducerProxy | ( | ) |
|
protected |
Structured messages reducer involved extended reducer FO. Process Json-based messages like Sphinx search results for reduce task.
|
protected |
Structured messages reducer. Process Json-based messages like Sphinx search results for reduce task.
<Init output message with Id
<Check in queries messages accumulator map
<Request not found, create new responses item
<Accumulate message
<If accumulated messages number is equal to clients number - reduce task
<Reduce all accumulated messages and get resulted context
<Add message body
<Send response message to server
<Delete responses item
<Delete responses map item
Definition at line 158 of file DataReducerProxy.cpp.


|
staticprotected |
Plain text reducer part. Reduces data accumulated. Removes duplicates and sorts.
<Request found, accumulate next message, inc counter
Definition at line 135 of file DataReducerProxy.cpp.

|
protected |
Plain text reducer part. Accumulates response messages from all registered replica nodes in corresponded response item according with message Id.
<Accumulate message body
<Set messages counter
<Insert new item in to the responses map
Definition at line 121 of file DataReducerProxy.cpp.


|
protected |
Plain text messages reducer. Process plain text messages for reduce task.
<Convert string buffer to vector of "unsigned long long"
<Sort vector
<Remove duplicated items
<Convert from vector to string
Definition at line 141 of file DataReducerProxy.cpp.

|
protectedvirtual |
Implements HCE::handlers::Handler.
Definition at line 53 of file DataReducerProxy.cpp.


|
protected |
Node mode: router, shard manager, replica manager or replica.
Definition at line 97 of file DataReducerProxy.cpp.


|
protected |
Time when ttl terminated tasks queue will be cleaned partially.
Definition at line 105 of file DataReducerProxy.cpp.


|
protectedvirtual |
ZMQ sockets.
Reimplemented from HCE::handlers::Handler.
Definition at line 382 of file DataReducerProxy.cpp.


|
protected |
Definition at line 81 of file DataReducerProxy.cpp.


|
protected |
Number of connected and registered clients.
Definition at line 89 of file DataReducerProxy.cpp.


|
protected |
<Fetch message from inproc_reducer_in
<Fetch fields from message
<If node in SHARD mode (send request to all data nodes) and if clients number > 1 - use Reducer to collect messages and reduce results
<If node "Single client" or REPLICA mode (send request to one data node roud-robin way) - just re-send message from one data node to upper level w/o Reducer usage Add fields back to message
<Send response message to server
Reimplemented from HCE::handlers::Handler.
Definition at line 266 of file DataReducerProxy.cpp.


|
protectedvirtual |
<Reducer FO initialization
<!<Rebuild admin client socket
<Create inproc_reducer_in socket
<Bind to inproc_reducer_in socket
<Create inproc_reducer_out socket
<Bind to inproc_reducer_out socket
Implements HCE::handlers::Handler.
Definition at line 20 of file DataReducerProxy.cpp.


|
protectedvirtual |
<Enter main loop of processing of message
<Init polling pool
<Poll dataclient
<Handle message
<Send periodical property values
<Dump run-time variables at regular intervals
Implements HCE::handlers::Handler.
Definition at line 394 of file DataReducerProxy.cpp.

|
protectedvirtual |
Reducer FO initialization
<Process commands
<Make property information
<Parse command parameters and fetch log level value
<Unsupported command
Reimplemented from HCE::handlers::Handler.
Definition at line 312 of file DataReducerProxy.cpp.

|
protected |
TTL tasks queue cleanup for reduceJson.
<If serialization error return default result to reducer
<Init output message with Id
<Add message body
<Send response message to server
Definition at line 198 of file DataReducerProxy.cpp.


|
protected |
Plain text reducer part. Creates response item to accumulate all responses from registered replica nodes.
<Create _extendedReducerFO instance
Definition at line 109 of file DataReducerProxy.cpp.


|
protected |
Get property information.
Definition at line 366 of file DataReducerProxy.cpp.


|
protected |
Definition at line 93 of file DataReducerProxy.cpp.


|
protected |
Definition at line 101 of file DataReducerProxy.cpp.


|
protected |
Definition at line 77 of file DataReducerProxy.cpp.


|
protected |
Definition at line 85 of file DataReducerProxy.cpp.


|
protected |
Definition at line 103 of file DataReducerProxy.hpp.
|
protected |
Definition at line 81 of file DataReducerProxy.hpp.
|
protected |
Statistical variables.
Definition at line 84 of file DataReducerProxy.hpp.
|
protected |
Definition at line 100 of file DataReducerProxy.hpp.
|
protected |
<Accumulator map for plain text messages
Definition at line 107 of file DataReducerProxy.hpp.
| const int HCE::handlers::DataReducerProxy::TTL_TASKS_CLEANUP_INTERVAL_DEFAULT = 10000 |
Definition at line 42 of file DataReducerProxy.hpp.
| const int HCE::handlers::DataReducerProxy::TTL_TASKS_CLEANUP_NUMBER_DEFAULT = 100 |
Definition at line 43 of file DataReducerProxy.hpp.
| const int HCE::handlers::DataReducerProxy::TTL_TASKS_QUEUE_CLEANUP_NUMBER_DEFAULT = 1000 |
Definition at line 44 of file DataReducerProxy.hpp.
| const int HCE::handlers::DataReducerProxy::TTL_TASKS_QUEUE_CLEANUP_TIME = 200 |
Definition at line 45 of file DataReducerProxy.hpp.