7 unsigned char logPriority,
const std::string& iniFile) :
8 Handler(name, ctx, clientIdentity, std::string(
""), startedAt, logPriority, iniFile),
9 _inprocReducerInSock(nullptr), _inprocReducerOutSock(nullptr),
10 _reducerTtlTasksCleanupAt(TTL_TASKS_CLEANUP_INTERVAL_DEFAULT), _extendedReducerFO(NULL){
137 DefaultReducerResponseItemIterator->second->bodies +=
" " +
messageBody;
138 DefaultReducerResponseItemIterator->second->count++;
143 std::stringstream ss(DefaultReducerResponseItemIterator->second->bodies);
144 std::vector<unsigned long long> items((std::istream_iterator<unsigned long long>(ss)), std::istream_iterator<unsigned long long>());
146 sort(items.begin(), items.end(), std::less<unsigned long long>());
148 items.erase(std::unique(items.begin(), items.end()), items.end());
150 std::ostringstream oss;
151 std::copy(items.begin(), items.end() - 1, std::ostream_iterator<unsigned long long>(oss,
" "));
153 std::string body(oss.str());
160 zmsg msg(messageId.c_str());
163 std::map<std::string, DefaultReducerResponseItem*>::iterator DefaultReducerResponseItemIterator =
_responses.find(messageId);
164 if(DefaultReducerResponseItemIterator ==
_responses.end()){
171 if(DefaultReducerResponseItemIterator->second->count ==
getClientsNumber()){
176 msg.append(body.c_str());
186 logger.
log(
NODE_LOG_MODE_DEBUG) <<
"RESPONSE from plain text reducer to inproc_reducer_out socket for upper server sent, message [" << messageId <<
"], size:" << body.size()
187 <<
" bytes, accumulated:" << DefaultReducerResponseItemIterator->second->count <<
", n=" <<
getStatResponses() <<
flush;
190 delete DefaultReducerResponseItemIterator->second;
193 _responses.erase(DefaultReducerResponseItemIterator);
200 Poco::Checksum checksum(Poco::Checksum::TYPE_CRC32);
201 checksum.update(messageId);
202 unsigned long long itemId = checksum.checksum();
212 Poco::SharedPtr<HCE::reduce_types::ReducingOutputMessage> reducingOutputMessage =
_extendedReducerFO->
reduce(itemId);
214 logger.
log(
NODE_LOG_MODE_DEBUG) <<
"Reducer output json [" << reducingOutputMessage->getBody() <<
"], type: " <<
static_cast<int>(reducingOutputMessage->getProcessingType()) << flush;
217 std::string reducerResultJson;
220 if(reducingOutputMessage->getProcessingErrorCode()){
223 << reducingOutputMessage->getProcessingErrorMsg() <<
flush;
225 reducerResultJson = defaultJson.
getJSON();
228 reducerResultJson = reducingOutputMessage->getBody();
232 HCE::JsonMessageCover msgCover(reducingOutputMessage->getProcessingType(), reducerResultJson, reducingOutputMessage->getTTL());
233 if(!msgCover.serialize(reducerResultJson)){
238 reducerResultJson = defaultJson.
getJSON();
244 zmsg msg(messageId.c_str());
246 msg.
append(reducerResultJson.c_str());
256 logger.
log(
NODE_LOG_MODE_DEBUG) <<
"RESPONSE from json reducer to inproc_reducer_out socket for upper server sent, message [" << messageId <<
"], size:" << reducerResultJson.size()
257 <<
" bytes, items accumulated:" <<
", time:" << reducingOutputMessage->getProcessingElapsedTime()
275 unsigned int clientsNumber = 0;
277 std::string messageId((
const char*)msg.
pop_front().c_str());
279 std::string messageRoute((
const char*)msg.
pop_front().c_str());
280 std::istringstream((
const char*)msg.
pop_front().c_str()) >> clientsNumber;
296 msg.
append(messageId.c_str());
298 msg.
append(messageRoute.c_str());
319 std::stringstream result;
353 std::istringstream(parameters) >> nodeMode;
372 if(tasks > 0 || tasksItems){
383 std::stringstream result;
408 if(items[0].revents & ZMQ_POLLIN){
413 if(items[1].revents & ZMQ_POLLIN){
427 if (EINTR!=error.
num()){
428 std::cout <<
"DataReducerProxy E: " << error.
what() <<
std::endl;
430 }
catch(std::exception& e){
431 std::cout <<
"DataReducerProxy E: " << e.what() <<
std::endl;
433 std::cout <<
"DataReducerProxy E: Unknown exception" <<
std::endl;