hce-node application
1.4.3
HCE Hierarchical Cluster Engine node application
|
#include <DataReducerProxy.hpp>
Public Member Functions | |
DataReducerProxy (std::string name, zmq::context_t &ctx, const std::string &clientIdentity, unsigned char nodeMode, int64_t startedAt, unsigned char logPriority, const std::string &iniFile) | |
~DataReducerProxy () | |
Public Member Functions inherited from HCE::handlers::Handler | |
Handler (const std::string &name, zmq::context_t &context, const std::string &clientIdentity, const std::string &connectionString, int64_t startedAt, unsigned char logPriority, const std::string &iniFile="") | |
virtual | ~Handler () |
void | shutdown (void) |
Get handler instance name. | |
std::string | getName (void) |
Public Attributes | |
const int | TTL_TASKS_CLEANUP_INTERVAL_DEFAULT = 10000 |
const int | TTL_TASKS_CLEANUP_NUMBER_DEFAULT = 100 |
const int | TTL_TASKS_QUEUE_CLEANUP_NUMBER_DEFAULT = 1000 |
const int | TTL_TASKS_QUEUE_CLEANUP_TIME = 200 |
Protected Member Functions | |
int | initialize (void) |
void | deinitialize (void) |
void * | process (void) |
void | handleInternalMessage (void) |
std::string | processAdminCommands (std::string &command, std::string ¶meters) |
Reducer FO initialization | |
void | reducerInit (void) |
Plain text reducer part. Creates response item to accumulate all responses from registered replica nodes. | |
void | defaultReducerResponseItemCreate (std::string &messageId, std::string &messageBody) |
Plain text reducer part. Accumulates response messages from all registered replica nodes in corresponded response item according with message Id. | |
std::string | defaultReducerResponseItemReduce (std::map< std::string, DefaultReducerResponseItem * >::iterator &DefaultReducerResponseItemIterator) |
Plain text messages reducer. Process plain text messages for reduce task. | |
void | defaultReducerReducePlainText (std::string &messageId, std::string &messageBody) |
Structured messages reducer. Process Json-based messages like Sphinx search results for reduce task. | |
void | defaultReducerReduceJson (std::string &messageId, std::string &messageBody) |
Structured messages reducer involved extended reducer FO. Process Json-based messages like Sphinx search results for reduce task. | |
void | reduceJson (std::string &messageId, std::string &messageBody) |
TTL tasks queue cleanup for reduceJson. | |
void | reducerTasksCleanupByTtl (void) |
Get property information. | |
std::string | getPropertyInformation (void) |
ZMQ sockets. | |
void | setStatRequests (unsigned int statRequests) |
unsigned int | getStatRequests (void) |
void | setStatResponses (unsigned int statResponses) |
unsigned int | getStatResponses (void) |
Number of connected and registered clients. | |
void | setClientsNumber (unsigned int clientsNumber) |
unsigned int | getClientsNumber (void) |
Node mode: router, shard manager, replica manager or replica. | |
void | setNodeMode (unsigned char nodeMode) |
unsigned char | getNodeMode (void) |
Time when ttl terminated tasks queue will be cleaned partially. | |
Protected Member Functions inherited from HCE::handlers::Handler | |
void | setName (const std::string &name) |
<Set handler instance name | |
void | setClientIdentity (const std::string &clientIdentity) |
Get client ID. | |
std::string | getClientIdentity (void) |
Set start time mark. | |
void | setStartedAt (int64_t startedAt) |
Get start time mark. | |
int64_t | getStartedAt (void) |
Set log priority level. | |
void | setLogPriority (unsigned int logPriority) |
Get log priority level. | |
unsigned int | getLogPriority (void) |
Set ini file name. | |
void | setIniFile (const std::string &iniFile) |
Get ini file name. | |
std::string | getIniFile (void) |
Set ZMQ connections rebuild tries max number. | |
void | setBindTriesMaxNumber (const unsigned int &bindTriesMaxNumber) |
Get ZMQ connections rebuild tries max number. | |
unsigned int | getBindTriesMaxNumber (void) |
Set ZMQ connections rebuild tries delay. | |
void | setBindTriesDelay (const unsigned int &bindTriesDelay) |
Get ZMQ connections rebuild tries delay. | |
unsigned int | getBindTriesDelay (void) |
Set ZMQ connection string. | |
void | setConnectionString (const std::string &connectionString) |
Get ZMQ connection string. | |
std::string | getConnectionString (void) |
Set flag for polling condition. | |
void | setInProgress (bool in_progress) |
Get flag for polling condition. | |
bool | getInProgress (void) const |
Set poll timeout value. | |
void | setPollTimeout (unsigned int pollTimeout) |
Get poll timeout value. | |
unsigned int | getPollTimeout (void) |
Set property interval between resend Property values. | |
void | setPropertyInterval (unsigned int propertyInterval) |
Get property interval between resend Property values. | |
unsigned int | getPropertyInterval (void) |
Set interval between dump run-time variables. | |
void | setDumpInterval (unsigned int dumpInterval) |
Get interval between dump run-time variables. | |
unsigned int | getDumpInterval (void) |
Set flag for allowed restore variables use dump. | |
void | setDumpAllowRestore (unsigned int dumpAllowRestore) |
Get interval between dump run-time variables. | |
unsigned int | getDumpAllowRestore (void) |
void | logFlush (unsigned int priority=NODE_LOG_MODE_DEFAULT) |
Node init from file. | |
void | initHandler (const std::string &iniFile, LoggerStream &logger) |
void | handleAdminMessage (void) |
void | processAdminMessage (zmsg &msg) |
void | rebuildAdminClientSocket (void) |
bool | rebuildServerConnection (zmq::socket_t *&sock_p, const std::string &connectionString, const int type) |
void | rebuildClientConnection (zmq::socket_t *&sock_p, const std::string &connectionString, const int type, const std::string &identity="") |
bool | rebuildClientConnection (zmq::socket_t *&sock_p, const std::string &connectionString, const int type, const std::string &identity, int linger, const std::string &readyMessage="", const std::string &byeMessage=NODE_MSG_BYE) |
void | sendByeMessage (zmq::socket_t *&sock_p, const std::string &identity="", const std::string &byeMessage=NODE_MSG_BYE) |
std::string | getProperties (void) |
Get property information. | |
void | sendPeriodicProperties (void) |
Safe create directories if necessary. | |
void | createDirectories (const std::string &name) |
Get file name use for dump and restore properties values. | |
std::string | getDumpFileName (void) |
Get file name use for dump and restore statistic values. | |
std::string | getStatFileName (void) |
Dump properties variables. | |
bool | dumpProperties (void) |
Dump run-time variables at regular intervals. | |
bool | dumpRunTimeVariables (void) |
void | handleExternalMessage (void) |
void | processControlMessage (zmsg &msg) |
void | processDataMessage (zmsg &msg) |
void | logPeriodicStatistics (void) |
void | heartbit (void) |
void | registerClient (const std::string &identity) |
Admin socket. |
Static Protected Member Functions | |
static void | defaultReducerResponseItemAccumulate (std::map< std::string, DefaultReducerResponseItem * >::iterator &DefaultReducerResponseItemIterator, std::string &messageBody) |
Plain text reducer part. Reduces data accumulated. Removes duplicates and sorts. |
Protected Attributes | |
zmq::socket_t * | _inprocReducerInSock |
zmq::socket_t * | _inprocReducerOutSock |
Statistical variables. | |
int64_t | _reducerTtlTasksCleanupAt |
HCE::reduce::ReducerFunctionalObject * | _extendedReducerFO |
std::map< std::string, DefaultReducerResponseItem * > | _responses |
<Accumulator map for plain text messages | |
Protected Attributes inherited from HCE::handlers::Handler | |
zmq::socket_t * | _inprocAdminSock |
Storage for run-time property variables. | |
HandlerProperties | handlerProperties |
Storage for run-time statistic variables. | |
HandlerProperties | statProperties |
ZMQ context. | |
zmq::context_t & | _context |
std::bitset< 4 > | _logMask |
std::string | _logTimeFormat |
std::stringstream | log |
LoggerStream | logger |
Main loop of sockets polling condition. | |
bool | _inProgress |
Send out periodical property values to Admin class at regular intervals. | |
int64_t | _propertyIntervalAt |
Dump run-time variables at regular intervals. | |
int64_t | _dumpIntervalAt |
Dump dir name. | |
std::string | _dumpDir |
Additional Inherited Members | |
Static Public Member Functions inherited from HCE::handlers::Handler | |
static void * | main (void *that) |
Definition at line 40 of file DataReducerProxy.hpp.
HCE::handlers::DataReducerProxy::DataReducerProxy | ( | std::string | name, |
zmq::context_t & | ctx, | ||
const std::string & | clientIdentity, | ||
unsigned char | nodeMode, | ||
int64_t | startedAt, | ||
unsigned char | logPriority, | ||
const std::string & | iniFile | ||
) |
HCE::handlers::DataReducerProxy::~DataReducerProxy | ( | ) |
|
protected |
Structured messages reducer involved extended reducer FO. Process Json-based messages like Sphinx search results for reduce task.
|
protected |
Structured messages reducer. Process Json-based messages like Sphinx search results for reduce task.
<Init output message with Id
<Check in queries messages accumulator map
<Request not found, create new responses item
<Accumulate message
<If accumulated messages number is equal to clients number - reduce task
<Reduce all accumulated messages and get resulted context
<Add message body
<Send response message to server
<Delete responses item
<Delete responses map item
Definition at line 158 of file DataReducerProxy.cpp.
|
staticprotected |
Plain text reducer part. Reduces data accumulated. Removes duplicates and sorts.
<Request found, accumulate next message, inc counter
Definition at line 135 of file DataReducerProxy.cpp.
|
protected |
Plain text reducer part. Accumulates response messages from all registered replica nodes in corresponded response item according with message Id.
<Accumulate message body
<Set messages counter
<Insert new item in to the responses map
Definition at line 121 of file DataReducerProxy.cpp.
|
protected |
Plain text messages reducer. Process plain text messages for reduce task.
<Convert string buffer to vector of "unsigned long long"
<Sort vector
<Remove duplicated items
<Convert from vector to string
Definition at line 141 of file DataReducerProxy.cpp.
|
protectedvirtual |
Implements HCE::handlers::Handler.
Definition at line 53 of file DataReducerProxy.cpp.
|
protected |
Node mode: router, shard manager, replica manager or replica.
Definition at line 97 of file DataReducerProxy.cpp.
|
protected |
Time when ttl terminated tasks queue will be cleaned partially.
Definition at line 105 of file DataReducerProxy.cpp.
|
protectedvirtual |
ZMQ sockets.
Reimplemented from HCE::handlers::Handler.
Definition at line 382 of file DataReducerProxy.cpp.
|
protected |
Definition at line 81 of file DataReducerProxy.cpp.
|
protected |
Number of connected and registered clients.
Definition at line 89 of file DataReducerProxy.cpp.
|
protected |
<Fetch message from inproc_reducer_in
<Fetch fields from message
<If node in SHARD mode (send request to all data nodes) and if clients number > 1 - use Reducer to collect messages and reduce results
<If node "Single client" or REPLICA mode (send request to one data node roud-robin way) - just re-send message from one data node to upper level w/o Reducer usage Add fields back to message
<Send response message to server
Reimplemented from HCE::handlers::Handler.
Definition at line 266 of file DataReducerProxy.cpp.
|
protectedvirtual |
<Reducer FO initialization
<!<Rebuild admin client socket
<Create inproc_reducer_in socket
<Bind to inproc_reducer_in socket
<Create inproc_reducer_out socket
<Bind to inproc_reducer_out socket
Implements HCE::handlers::Handler.
Definition at line 20 of file DataReducerProxy.cpp.
|
protectedvirtual |
<Enter main loop of processing of message
<Init polling pool
<Poll dataclient
<Handle message
<Send periodical property values
<Dump run-time variables at regular intervals
Implements HCE::handlers::Handler.
Definition at line 394 of file DataReducerProxy.cpp.
|
protectedvirtual |
Reducer FO initialization
<Process commands
<Make property information
<Parse command parameters and fetch log level value
<Unsupported command
Reimplemented from HCE::handlers::Handler.
Definition at line 312 of file DataReducerProxy.cpp.
|
protected |
TTL tasks queue cleanup for reduceJson.
<If serialization error return default result to reducer
<Init output message with Id
<Add message body
<Send response message to server
Definition at line 198 of file DataReducerProxy.cpp.
|
protected |
Plain text reducer part. Creates response item to accumulate all responses from registered replica nodes.
<Create _extendedReducerFO instance
Definition at line 109 of file DataReducerProxy.cpp.
|
protected |
Get property information.
Definition at line 366 of file DataReducerProxy.cpp.
|
protected |
Definition at line 93 of file DataReducerProxy.cpp.
|
protected |
Definition at line 101 of file DataReducerProxy.cpp.
|
protected |
Definition at line 77 of file DataReducerProxy.cpp.
|
protected |
Definition at line 85 of file DataReducerProxy.cpp.
|
protected |
Definition at line 103 of file DataReducerProxy.hpp.
|
protected |
Definition at line 81 of file DataReducerProxy.hpp.
|
protected |
Statistical variables.
Definition at line 84 of file DataReducerProxy.hpp.
|
protected |
Definition at line 100 of file DataReducerProxy.hpp.
|
protected |
<Accumulator map for plain text messages
Definition at line 107 of file DataReducerProxy.hpp.
const int HCE::handlers::DataReducerProxy::TTL_TASKS_CLEANUP_INTERVAL_DEFAULT = 10000 |
Definition at line 42 of file DataReducerProxy.hpp.
const int HCE::handlers::DataReducerProxy::TTL_TASKS_CLEANUP_NUMBER_DEFAULT = 100 |
Definition at line 43 of file DataReducerProxy.hpp.
const int HCE::handlers::DataReducerProxy::TTL_TASKS_QUEUE_CLEANUP_NUMBER_DEFAULT = 1000 |
Definition at line 44 of file DataReducerProxy.hpp.
const int HCE::handlers::DataReducerProxy::TTL_TASKS_QUEUE_CLEANUP_TIME = 200 |
Definition at line 45 of file DataReducerProxy.hpp.