hce-node application  1.4.3
HCE Hierarchical Cluster Engine node application
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
HCE::handlers::DataReducerProxy Class Reference

#include <DataReducerProxy.hpp>

Inheritance diagram for HCE::handlers::DataReducerProxy:
Collaboration diagram for HCE::handlers::DataReducerProxy:

Public Member Functions

 DataReducerProxy (std::string name, zmq::context_t &ctx, const std::string &clientIdentity, unsigned char nodeMode, int64_t startedAt, unsigned char logPriority, const std::string &iniFile)
 ~DataReducerProxy ()
- Public Member Functions inherited from HCE::handlers::Handler
 Handler (const std::string &name, zmq::context_t &context, const std::string &clientIdentity, const std::string &connectionString, int64_t startedAt, unsigned char logPriority, const std::string &iniFile="")
virtual ~Handler ()
void shutdown (void)
 Get handler instance name.
std::string getName (void)

Public Attributes

const int TTL_TASKS_CLEANUP_INTERVAL_DEFAULT = 10000
const int TTL_TASKS_CLEANUP_NUMBER_DEFAULT = 100
const int TTL_TASKS_QUEUE_CLEANUP_NUMBER_DEFAULT = 1000
const int TTL_TASKS_QUEUE_CLEANUP_TIME = 200

Protected Member Functions

int initialize (void)
void deinitialize (void)
void * process (void)
void handleInternalMessage (void)
std::string processAdminCommands (std::string &command, std::string &parameters)
 Reducer FO initialization
void reducerInit (void)
 Plain text reducer part. Creates response item to accumulate all responses from registered replica nodes.
void defaultReducerResponseItemCreate (std::string &messageId, std::string &messageBody)
 Plain text reducer part. Accumulates response messages from all registered replica nodes in corresponded response item according with message Id.
std::string defaultReducerResponseItemReduce (std::map< std::string, DefaultReducerResponseItem * >::iterator &DefaultReducerResponseItemIterator)
 Plain text messages reducer. Process plain text messages for reduce task.
void defaultReducerReducePlainText (std::string &messageId, std::string &messageBody)
 Structured messages reducer. Process Json-based messages like Sphinx search results for reduce task.
void defaultReducerReduceJson (std::string &messageId, std::string &messageBody)
 Structured messages reducer involved extended reducer FO. Process Json-based messages like Sphinx search results for reduce task.
void reduceJson (std::string &messageId, std::string &messageBody)
 TTL tasks queue cleanup for reduceJson.
void reducerTasksCleanupByTtl (void)
 Get property information.
std::string getPropertyInformation (void)
 ZMQ sockets.
void setStatRequests (unsigned int statRequests)
unsigned int getStatRequests (void)
void setStatResponses (unsigned int statResponses)
unsigned int getStatResponses (void)
 Number of connected and registered clients.
void setClientsNumber (unsigned int clientsNumber)
unsigned int getClientsNumber (void)
 Node mode: router, shard manager, replica manager or replica.
void setNodeMode (unsigned char nodeMode)
unsigned char getNodeMode (void)
 Time when ttl terminated tasks queue will be cleaned partially.
- Protected Member Functions inherited from HCE::handlers::Handler
void setName (const std::string &name)
 <Set handler instance name
void setClientIdentity (const std::string &clientIdentity)
 Get client ID.
std::string getClientIdentity (void)
 Set start time mark.
void setStartedAt (int64_t startedAt)
 Get start time mark.
int64_t getStartedAt (void)
 Set log priority level.
void setLogPriority (unsigned int logPriority)
 Get log priority level.
unsigned int getLogPriority (void)
 Set ini file name.
void setIniFile (const std::string &iniFile)
 Get ini file name.
std::string getIniFile (void)
 Set ZMQ connections rebuild tries max number.
void setBindTriesMaxNumber (const unsigned int &bindTriesMaxNumber)
 Get ZMQ connections rebuild tries max number.
unsigned int getBindTriesMaxNumber (void)
 Set ZMQ connections rebuild tries delay.
void setBindTriesDelay (const unsigned int &bindTriesDelay)
 Get ZMQ connections rebuild tries delay.
unsigned int getBindTriesDelay (void)
 Set ZMQ connection string.
void setConnectionString (const std::string &connectionString)
 Get ZMQ connection string.
std::string getConnectionString (void)
 Set flag for polling condition.
void setInProgress (bool in_progress)
 Get flag for polling condition.
bool getInProgress (void) const
 Set poll timeout value.
void setPollTimeout (unsigned int pollTimeout)
 Get poll timeout value.
unsigned int getPollTimeout (void)
 Set property interval between resend Property values.
void setPropertyInterval (unsigned int propertyInterval)
 Get property interval between resend Property values.
unsigned int getPropertyInterval (void)
 Set interval between dump run-time variables.
void setDumpInterval (unsigned int dumpInterval)
 Get interval between dump run-time variables.
unsigned int getDumpInterval (void)
 Set flag for allowed restore variables use dump.
void setDumpAllowRestore (unsigned int dumpAllowRestore)
 Get interval between dump run-time variables.
unsigned int getDumpAllowRestore (void)
void logFlush (unsigned int priority=NODE_LOG_MODE_DEFAULT)
 Node init from file.
void initHandler (const std::string &iniFile, LoggerStream &logger)
void handleAdminMessage (void)
void processAdminMessage (zmsg &msg)
void rebuildAdminClientSocket (void)
bool rebuildServerConnection (zmq::socket_t *&sock_p, const std::string &connectionString, const int type)
void rebuildClientConnection (zmq::socket_t *&sock_p, const std::string &connectionString, const int type, const std::string &identity="")
bool rebuildClientConnection (zmq::socket_t *&sock_p, const std::string &connectionString, const int type, const std::string &identity, int linger, const std::string &readyMessage="", const std::string &byeMessage=NODE_MSG_BYE)
void sendByeMessage (zmq::socket_t *&sock_p, const std::string &identity="", const std::string &byeMessage=NODE_MSG_BYE)
std::string getProperties (void)
 Get property information.
void sendPeriodicProperties (void)
 Safe create directories if necessary.
void createDirectories (const std::string &name)
 Get file name use for dump and restore properties values.
std::string getDumpFileName (void)
 Get file name use for dump and restore statistic values.
std::string getStatFileName (void)
 Dump properties variables.
bool dumpProperties (void)
 Dump run-time variables at regular intervals.
bool dumpRunTimeVariables (void)
void handleExternalMessage (void)
void processControlMessage (zmsg &msg)
void processDataMessage (zmsg &msg)
void logPeriodicStatistics (void)
void heartbit (void)
void registerClient (const std::string &identity)
 Admin socket.

Static Protected Member Functions

static void defaultReducerResponseItemAccumulate (std::map< std::string, DefaultReducerResponseItem * >::iterator &DefaultReducerResponseItemIterator, std::string &messageBody)
 Plain text reducer part. Reduces data accumulated. Removes duplicates and sorts.

Protected Attributes

zmq::socket_t_inprocReducerInSock
zmq::socket_t_inprocReducerOutSock
 Statistical variables.
int64_t _reducerTtlTasksCleanupAt
HCE::reduce::ReducerFunctionalObject_extendedReducerFO
std::map< std::string,
DefaultReducerResponseItem * > 
_responses
 <Accumulator map for plain text messages
- Protected Attributes inherited from HCE::handlers::Handler
zmq::socket_t_inprocAdminSock
 Storage for run-time property variables.
HandlerProperties handlerProperties
 Storage for run-time statistic variables.
HandlerProperties statProperties
 ZMQ context.
zmq::context_t_context
std::bitset< 4 > _logMask
std::string _logTimeFormat
std::stringstream log
LoggerStream logger
 Main loop of sockets polling condition.
bool _inProgress
 Send out periodical property values to Admin class at regular intervals.
int64_t _propertyIntervalAt
 Dump run-time variables at regular intervals.
int64_t _dumpIntervalAt
 Dump dir name.
std::string _dumpDir

Additional Inherited Members

- Static Public Member Functions inherited from HCE::handlers::Handler
static void * main (void *that)

Detailed Description

Definition at line 40 of file DataReducerProxy.hpp.

Constructor & Destructor Documentation

HCE::handlers::DataReducerProxy::DataReducerProxy ( std::string  name,
zmq::context_t ctx,
const std::string &  clientIdentity,
unsigned char  nodeMode,
int64_t  startedAt,
unsigned char  logPriority,
const std::string &  iniFile 
)

Definition at line 6 of file DataReducerProxy.cpp.

Here is the call graph for this function:

HCE::handlers::DataReducerProxy::~DataReducerProxy ( )

Definition at line 16 of file DataReducerProxy.cpp.

Here is the call graph for this function:

Member Function Documentation

void HCE::handlers::DataReducerProxy::defaultReducerReduceJson ( std::string &  messageId,
std::string &  messageBody 
)
protected

Structured messages reducer involved extended reducer FO. Process Json-based messages like Sphinx search results for reduce task.

void HCE::handlers::DataReducerProxy::defaultReducerReducePlainText ( std::string &  messageId,
std::string &  messageBody 
)
protected

Structured messages reducer. Process Json-based messages like Sphinx search results for reduce task.

<Init output message with Id

<Check in queries messages accumulator map

<Request not found, create new responses item

<Accumulate message

<If accumulated messages number is equal to clients number - reduce task

<Reduce all accumulated messages and get resulted context

<Add message body

<Send response message to server

<Delete responses item

<Delete responses map item

Definition at line 158 of file DataReducerProxy.cpp.

Here is the call graph for this function:

Here is the caller graph for this function:

void HCE::handlers::DataReducerProxy::defaultReducerResponseItemAccumulate ( std::map< std::string, DefaultReducerResponseItem * >::iterator &  DefaultReducerResponseItemIterator,
std::string &  messageBody 
)
staticprotected

Plain text reducer part. Reduces data accumulated. Removes duplicates and sorts.

<Request found, accumulate next message, inc counter

Definition at line 135 of file DataReducerProxy.cpp.

Here is the caller graph for this function:

void HCE::handlers::DataReducerProxy::defaultReducerResponseItemCreate ( std::string &  messageId,
std::string &  messageBody 
)
protected

Plain text reducer part. Accumulates response messages from all registered replica nodes in corresponded response item according with message Id.

<Accumulate message body

<Set messages counter

<Insert new item in to the responses map

Definition at line 121 of file DataReducerProxy.cpp.

Here is the call graph for this function:

Here is the caller graph for this function:

std::string HCE::handlers::DataReducerProxy::defaultReducerResponseItemReduce ( std::map< std::string, DefaultReducerResponseItem * >::iterator &  DefaultReducerResponseItemIterator)
protected

Plain text messages reducer. Process plain text messages for reduce task.

<Convert string buffer to vector of "unsigned long long"

<Sort vector

<Remove duplicated items

<Convert from vector to string

Definition at line 141 of file DataReducerProxy.cpp.

Here is the caller graph for this function:

void HCE::handlers::DataReducerProxy::deinitialize ( void  )
protectedvirtual

Implements HCE::handlers::Handler.

Definition at line 53 of file DataReducerProxy.cpp.

Here is the call graph for this function:

Here is the caller graph for this function:

unsigned int HCE::handlers::DataReducerProxy::getClientsNumber ( void  )
protected

Node mode: router, shard manager, replica manager or replica.

Definition at line 97 of file DataReducerProxy.cpp.

Here is the call graph for this function:

Here is the caller graph for this function:

unsigned char HCE::handlers::DataReducerProxy::getNodeMode ( void  )
protected

Time when ttl terminated tasks queue will be cleaned partially.

Definition at line 105 of file DataReducerProxy.cpp.

Here is the call graph for this function:

Here is the caller graph for this function:

std::string HCE::handlers::DataReducerProxy::getPropertyInformation ( void  )
protectedvirtual

ZMQ sockets.

Reimplemented from HCE::handlers::Handler.

Definition at line 382 of file DataReducerProxy.cpp.

Here is the call graph for this function:

Here is the caller graph for this function:

unsigned int HCE::handlers::DataReducerProxy::getStatRequests ( void  )
protected

Definition at line 81 of file DataReducerProxy.cpp.

Here is the call graph for this function:

Here is the caller graph for this function:

unsigned int HCE::handlers::DataReducerProxy::getStatResponses ( void  )
protected

Number of connected and registered clients.

Definition at line 89 of file DataReducerProxy.cpp.

Here is the call graph for this function:

Here is the caller graph for this function:

void HCE::handlers::DataReducerProxy::handleInternalMessage ( void  )
protected

<Fetch message from inproc_reducer_in

<Fetch fields from message

<If node in SHARD mode (send request to all data nodes) and if clients number > 1 - use Reducer to collect messages and reduce results

<If node "Single client" or REPLICA mode (send request to one data node roud-robin way) - just re-send message from one data node to upper level w/o Reducer usage Add fields back to message

<Send response message to server

Reimplemented from HCE::handlers::Handler.

Definition at line 266 of file DataReducerProxy.cpp.

Here is the call graph for this function:

Here is the caller graph for this function:

int HCE::handlers::DataReducerProxy::initialize ( void  )
protectedvirtual

<Reducer FO initialization

<!<Rebuild admin client socket

<Create inproc_reducer_in socket

<Bind to inproc_reducer_in socket

<Create inproc_reducer_out socket

<Bind to inproc_reducer_out socket

Implements HCE::handlers::Handler.

Definition at line 20 of file DataReducerProxy.cpp.

Here is the call graph for this function:

Here is the caller graph for this function:

void * HCE::handlers::DataReducerProxy::process ( void  )
protectedvirtual

<Enter main loop of processing of message

<Init polling pool

<Poll dataclient

<Handle message

<Send periodical property values

<Dump run-time variables at regular intervals

Implements HCE::handlers::Handler.

Definition at line 394 of file DataReducerProxy.cpp.

Here is the call graph for this function:

std::string HCE::handlers::DataReducerProxy::processAdminCommands ( std::string &  command,
std::string &  parameters 
)
protectedvirtual

Reducer FO initialization

<Process commands

<Make property information

<Parse command parameters and fetch log level value

<Unsupported command

Reimplemented from HCE::handlers::Handler.

Definition at line 312 of file DataReducerProxy.cpp.

Here is the call graph for this function:

void HCE::handlers::DataReducerProxy::reduceJson ( std::string &  messageId,
std::string &  messageBody 
)
protected

TTL tasks queue cleanup for reduceJson.

<If serialization error return default result to reducer

<Init output message with Id

<Add message body

<Send response message to server

Definition at line 198 of file DataReducerProxy.cpp.

Here is the call graph for this function:

Here is the caller graph for this function:

void HCE::handlers::DataReducerProxy::reducerInit ( void  )
protected

Plain text reducer part. Creates response item to accumulate all responses from registered replica nodes.

<Create _extendedReducerFO instance

Definition at line 109 of file DataReducerProxy.cpp.

Here is the call graph for this function:

Here is the caller graph for this function:

void HCE::handlers::DataReducerProxy::reducerTasksCleanupByTtl ( void  )
protected

Get property information.

Definition at line 366 of file DataReducerProxy.cpp.

Here is the call graph for this function:

Here is the caller graph for this function:

void HCE::handlers::DataReducerProxy::setClientsNumber ( unsigned int  clientsNumber)
protected

Definition at line 93 of file DataReducerProxy.cpp.

Here is the call graph for this function:

Here is the caller graph for this function:

void HCE::handlers::DataReducerProxy::setNodeMode ( unsigned char  nodeMode)
protected

Definition at line 101 of file DataReducerProxy.cpp.

Here is the call graph for this function:

Here is the caller graph for this function:

void HCE::handlers::DataReducerProxy::setStatRequests ( unsigned int  statRequests)
protected

Definition at line 77 of file DataReducerProxy.cpp.

Here is the call graph for this function:

Here is the caller graph for this function:

void HCE::handlers::DataReducerProxy::setStatResponses ( unsigned int  statResponses)
protected

Definition at line 85 of file DataReducerProxy.cpp.

Here is the call graph for this function:

Here is the caller graph for this function:

Member Data Documentation

HCE::reduce::ReducerFunctionalObject* HCE::handlers::DataReducerProxy::_extendedReducerFO
protected

Definition at line 103 of file DataReducerProxy.hpp.

zmq::socket_t* HCE::handlers::DataReducerProxy::_inprocReducerInSock
protected

Definition at line 81 of file DataReducerProxy.hpp.

zmq::socket_t* HCE::handlers::DataReducerProxy::_inprocReducerOutSock
protected

Statistical variables.

Definition at line 84 of file DataReducerProxy.hpp.

int64_t HCE::handlers::DataReducerProxy::_reducerTtlTasksCleanupAt
protected

Definition at line 100 of file DataReducerProxy.hpp.

std::map<std::string, DefaultReducerResponseItem*> HCE::handlers::DataReducerProxy::_responses
protected

<Accumulator map for plain text messages

Definition at line 107 of file DataReducerProxy.hpp.

const int HCE::handlers::DataReducerProxy::TTL_TASKS_CLEANUP_INTERVAL_DEFAULT = 10000

Definition at line 42 of file DataReducerProxy.hpp.

const int HCE::handlers::DataReducerProxy::TTL_TASKS_CLEANUP_NUMBER_DEFAULT = 100

Definition at line 43 of file DataReducerProxy.hpp.

const int HCE::handlers::DataReducerProxy::TTL_TASKS_QUEUE_CLEANUP_NUMBER_DEFAULT = 1000

Definition at line 44 of file DataReducerProxy.hpp.

const int HCE::handlers::DataReducerProxy::TTL_TASKS_QUEUE_CLEANUP_TIME = 200

Definition at line 45 of file DataReducerProxy.hpp.


The documentation for this class was generated from the following files: