hce-node application  1.4.3
HCE Hierarchical Cluster Engine node application
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
SphinxFunctionalObject.cpp
Go to the documentation of this file.
1 #include <sstream>
2 #include <unistd.h>
3 #include <Poco/AutoPtr.h>
4 #include <Poco/String.h>
5 #include <Poco/Logger.h>
6 
8 #include "SphinxError.hpp"
10 #include "SphinxSchemaFile.hpp"
13 #include "SphinxMessageConst.hpp"
14 #include "SphinxAdminCommand.hpp"
16 
17 namespace HCE
18 {
19 namespace sphinx
20 {
21 //-----------------------------------------------------------------------------
22 SphinxFunctionalObject::SphinxFunctionalObject(const std::string& nodeName_, const std::string& homeDir_, const std::string& indexName_, bool startSearchd_, bool stopSearchd_)
23 :inherited(), SphinxNodeOptions(nodeName_, 0, homeDir_, indexName_), searcher(*this, startSearchd_, stopSearchd_), indexer(*this),
24  loaded(false), checkIncomingDocuments(false), packIncomingDocuments(false), minNumberFieldsPacking(0),
25  logger(sphinx_search_const::moduleName), messagesCollection(message_const::messages), message(messagesCollection),
26  matchMode(SPH_MATCH_EXTENDED2), sortMode(SPH_SORT_RELEVANCE), rankingMode(SPH_RANK_DEFAULT), rankerExpression("")
27 {
28  if (!homeDir_.empty() && !indexName_.empty())
29  if (loadConfiguration(indexName_))
30  {
31  log(LoggerStream::Priority::PRIO_DEBUG) << message(message_const::LOAD_CONFIG_SUCCESS)
33  if (startSearchd_)
34  startSearchd();
35  else
36  {
37  _isError = !easyStart();
38  }
39  }
40 }
41 //-----------------------------------------------------------------------------
42 SphinxFunctionalObject::SphinxFunctionalObject(const std::string& nodeName_, unsigned int nodeNumber_, const std::string& homeDir_, const std::string& indexName_, bool startSearchd_, bool stopSearchd_)
43 :inherited(), SphinxNodeOptions(nodeName_, nodeNumber_, homeDir_, indexName_), searcher(*this, startSearchd_, stopSearchd_), indexer(*this),
44  loaded(false), checkIncomingDocuments(false), packIncomingDocuments(false), minNumberFieldsPacking(0),
45  logger(sphinx_search_const::moduleName), messagesCollection(message_const::messages), message(messagesCollection),
46  matchMode(SPH_MATCH_EXTENDED2), sortMode(SPH_SORT_RELEVANCE), rankingMode(SPH_RANK_DEFAULT), rankerExpression("")
47 {
48  if (!homeDir_.empty() && !indexName_.empty())
49  if (loadConfiguration(indexName_))
50  {
51  log(LoggerStream::Priority::PRIO_DEBUG) << message(message_const::LOAD_CONFIG_SUCCESS) << message(message_const::INDEX, getIndexName())
53  if (startSearchd_)
54  startSearchd();
55  else
56  {
57  _isError = !easyStart();
58  }
59  }
60 }
61 //-----------------------------------------------------------------------------
63 {
64  if (searcher.isNeedStopSearchd())
65 // if (searcher.isActiveSearchd())
66  searcher.stopSearchd();
67 }
68 //-----------------------------------------------------------------------------
70 {
71  logger.resetLogStream();
72 }
73 //-----------------------------------------------------------------------------
75 {
76  logger.setLogStream(os);
77 }
78 //-----------------------------------------------------------------------------
79 std::string SphinxFunctionalObject::logMsg(bool isReset)
80 {
81  return logger.logMsg(isReset);
82 }
83 //-----------------------------------------------------------------------------
85 {
86  return logger.log(logPriority);
87 }
88 //-----------------------------------------------------------------------------
90 {
91  logger.setLoggable(loggable);
92 }
93 //-----------------------------------------------------------------------------
95 {
96  logger.resetLoggable();
97 }
98 //-----------------------------------------------------------------------------
99 void SphinxFunctionalObject::parseHostPort(const std::string& in, std::string& outHost, unsigned int& outPort) throw (std::exception)
100 {
101  size_t pos = in.find(':');
102  if (pos==std::string::npos)
103  {
105  outPort = std::stoul(Poco::trim(in));
106  }
107  else
108  {
109  outHost = Poco::trim(std::string(in, 0, pos));
110  outPort = std::stoul(Poco::trim(std::string(in, pos+1)));
111  }
112 }
113 //-----------------------------------------------------------------------------
114 void SphinxFunctionalObject::setRankingMode(unsigned int rankingMode_, const std::string& rankerExpression_)
115 {
116  rankingMode=rankingMode_;
117  if (!rankerExpression_.empty())
118  rankerExpression = rankerExpression_;
119 }
120 //-----------------------------------------------------------------------------
122 {
123  loaded = false;
124  _isError = false;
125  errorMsg.clear();
127  std::string configFile = getDataDir()+"/"+indexName+"/"+sphinx_search_const::sphinxPropertyFile;
128 
129  try
130  {
131  Poco::AutoPtr<SphinxConfigCreator> pConf = new SphinxConfigCreator(configFile);
132 
133  rankingMode = pConf->getUInt(sphinx_admin_command_const::optionsRanker, SPH_RANK_DEFAULT);
134  rankerExpression = pConf->getString(sphinx_admin_command_const::optionsRankerExpression);
135 
136  const std::string listen = pConf->getString(sphinx_search_const::searchdListen);
137  const std::string maxMatches = pConf->getString(sphinx_search_const::searchdMaxMatches);
138 
139  std::string host;
140  unsigned int port = 0;
141 
142  parseHostPort(listen, host, port);
143 
144  setServerHost((host.empty())?sphinx_search_const::defaultHost:host);
146 
147  std::string file = getDataDir()+"/"+indexName+"/"+sphinx_search_const::sphinxSchemaFile;
148  std::ifstream ifs(file.c_str());
149  if (!ifs.is_open())
150  throw Poco::Exception(message(message_const::FILE_NOT_FOUND, file), ERROR_LOAD_CONFIG);
151 
152  std::vector<std::pair<std::string, int> > weights;
153  SphinxSchemaFile schemaFile(ifs);
154  loaded = schemaFile.getFields(weights);
155  ifs.close();
156  if (!loaded)
157  throw Poco::Exception(schemaFile.getErrorMsg(), ERROR_LOAD_CONFIG);
158 
159  ifs.open(file.c_str());
160  if (!ifs.is_open())
161  throw Poco::Exception(message(message_const::FILE_NOT_OPEN, file), ERROR_LOAD_CONFIG);
162 
163  SphinxSchemaFile schema(ifs);
164  std::vector<std::string> schemaNames;
165  loaded = schema.getNames(schemaNames);
166  ifs.close();
167  if (!loaded)
168  throw Poco::Exception(schemaFile.getErrorMsg(), ERROR_LOAD_CONFIG);
169 
170  const unsigned int attributesCount = schema.getAttributesCount();
171  log(LoggerStream::Priority::PRIO_DEBUG) << message(message_const::SEARCHD_MAX_MATCHES, maxMatches) << flush;
172  log(LoggerStream::Priority::PRIO_DEBUG) << message(message_const::SCHEMA_NAMES_SIZE, (unsigned int)schemaNames.size()) << flush;
173  log(LoggerStream::Priority::PRIO_DEBUG) << message(message_const::ATTRIBUTES_COUNT, attributesCount) << flush;
174 
175  // realloc memory for manager
177 
178  searcher.setFullSchemaNames(schemaNames);
179  loaded = false;
180  if (searcher.setFieldWeights(weights))
181  {
182  weights.clear();
183 
184  std::vector<std::string> keys;
185  pConf->keys(keys);
186 
187  for (size_t i = 0;i<keys.size();++i)
188  {
189  size_t found = keys[i].find(sphinx_search_const::indexNameField);
190  if (found!=std::string::npos)
191  {
192  found = keys[i].find_first_not_of(' ', found+sphinx_search_const::indexNameField.length());
193  if (found!=std::string::npos)
194  {
195  weights.push_back(std::make_pair(keys[i].substr(found), 1));
196  }
197  }
198  }
199  if (weights.empty())
200  throw Poco::Exception(message(message_const::EMPTY_LIST_OF_INDEX_FILES), ERROR_LOAD_CONFIG);
201 
202  loaded = searcher.setIndexWeights(weights);
203  }
204  }
205  catch(Poco::Exception& e)
206  {
207  applyError(e.message(), e.code());
208  log(LoggerStream::Priority::PRIO_DEBUG) << message(message_const::LOAD_CONFIG_FAIL, errorMsg) << flush;
209  }
210  catch(std::exception& e)
211  {
212  applyError(e.what(), ERROR_LOAD_CONFIG);
213  log(LoggerStream::Priority::PRIO_DEBUG) << message(message_const::LOAD_CONFIG_FAIL, errorMsg) << flush;
214  }
215  return loaded;
216 }
217 //-----------------------------------------------------------------------------
219 {
220  searcher.resizeResultData(matchesCount, attributesCount);
221 }
222 //-----------------------------------------------------------------------------
224 {
225  return searcher.easyStart(matchMode, sortMode, rankingMode, rankerExpression);
226 }
227 //-----------------------------------------------------------------------------
229 {
230  return searcher.startSearchd();
231 }
232 //-----------------------------------------------------------------------------
234 {
235  return searcher.stopSearchd();
236 }
237 //-----------------------------------------------------------------------------
239 {
240  return searcher.isActiveSearchd();
241 }
242 //-----------------------------------------------------------------------------
244 {
245  return searcher.isConnected();
246 }
247 //-----------------------------------------------------------------------------
249 {
250  searcher.setMatchMode(matchMode);
251  searcher.setRankingMode(rankingMode, rankerExpression);
252  searcher.setSortMode(sortMode);
253  return searcher.open();
254 }
255 //-----------------------------------------------------------------------------
257 {
258  return searcher.close();
259 }
260 //-----------------------------------------------------------------------------
262 {
263  inherited::isError(false);
266  searcher.isError(false);
267  searcher.setErrorMsg("");
268  searcher.setErrorCode(NO_ERROR);
269  indexer.isError(false);
270  indexer.setErrorMsg("");
271  indexer.setErrorCode(NO_ERROR);
272 }
273 //-----------------------------------------------------------------------------
274 void SphinxFunctionalObject::applyError(const std::string& msg, unsigned int code)
275 {
276  _isError = true;
279 }
280 //-----------------------------------------------------------------------------
281 std::string SphinxFunctionalObject::getErrorMsg(void) const
282 {
283  std::string error;
284  if (inherited::isError())
285  error = inherited::getErrorMsg();
286  else if (searcher.isError())
287  error = searcher.getErrorMsg();
288  else if (indexer.isError())
289  error = indexer.getErrorMsg();
290  return error;
291 }
292 //-----------------------------------------------------------------------------
293 unsigned int SphinxFunctionalObject::getErrorCode(void) const
294 {
295  unsigned int code = NO_ERROR;
296  if (inherited::isError())
297  code = inherited::getErrorCode();
298  else if (searcher.isError())
299  code = searcher.getErrorCode();
300  else if (indexer.isError())
301  code = indexer.getErrorCode();
302  return code;
303 }
304 //-----------------------------------------------------------------------------
305 void SphinxFunctionalObject::setServerHost(const std::string& serverHost_)
306 {
307  searcher.setServerHost(serverHost_);
308 }
309 //-----------------------------------------------------------------------------
311 {
312  return searcher.getServerHost();
313 }
314 //-----------------------------------------------------------------------------
316 {
317  searcher.setServerPort(serverPort_);
318 }
319 //-----------------------------------------------------------------------------
321 {
322  return searcher.getServerPort();
323 }
324 //-----------------------------------------------------------------------------
325 bool SphinxFunctionalObject::setIndexFileName(const std::string& indexFileName_)
326 {
327  return searcher.setIndexFileName(indexFileName_);
328 }
329 //-----------------------------------------------------------------------------
331 {
332  return searcher.getIndexFileName();
333 }
334 //-----------------------------------------------------------------------------
335 void SphinxFunctionalObject::setTimeoutedRequests(unsigned long long timeoutedRequests_)
336 {
337  searcher.setTimeoutedRequests(timeoutedRequests_);
338 }
339 //-----------------------------------------------------------------------------
340 unsigned long long SphinxFunctionalObject::getTimeoutedRequests(void) const
341 {
342  return searcher.getTimeoutedRequests();
343 }
344 //-----------------------------------------------------------------------------
346 {
347  searcher.resetTimeoutedRequests();
348 }
349 //-----------------------------------------------------------------------------
350 void SphinxFunctionalObject::setDefaultRequestTimeout(unsigned long defaultRequestTimeout_)
351 {
352  searcher.setDefaultRequestTimeout(defaultRequestTimeout_);
353 }
354 //-----------------------------------------------------------------------------
356 {
357  return searcher.getDefaultRequestTimeout();
358 }
359 //-----------------------------------------------------------------------------
360 std::string SphinxFunctionalObject::Process(const std::string& json)
361 {
362  _isError = false;
363  errorMsg = "";
365  std::string resultData;
366  timeval tvStart, tvStop;
367  gettimeofday(&tvStart, 0); //gettimeofday does not support TZ adjust on Linux.
368  bool needDefaultJson = false;
369  try
370  {
371  SphinxInputJsonMessage inputJsonMessage(json);
372  if (inputJsonMessage.isError())
373  throw Poco::Exception(inputJsonMessage.getErrorMsg(), PARSE_ERROR);
374 
375  if (inputJsonMessage.getType() == SphinxInputJsonMessage::MessageType::mtSearch)
376  {
377  needDefaultJson = true;
378  resultData = searcher.makeSearchCommand(inputJsonMessage.getData());
379  if (resultData.empty())
380  resultData = makeDefaultJSON(message(message_const::EMPTY_RESULT_DATA));
381  }
382  else if (inputJsonMessage.getType() == SphinxInputJsonMessage::MessageType::mtIndex)
383  {
384  indexer.makeIndexCommand(inputJsonMessage.getData());
385  }
386  else if (inputJsonMessage.getType() == SphinxInputJsonMessage::MessageType::mtManage)
387  {
388  indexer.makeAdminCommand(inputJsonMessage.getData(), resultData);
389  }
390  else
391  throw Poco::Exception(message(message_const::INCORRECT_JSON_TYPE), COMMAND_ERROR);
392 
393  }
394  catch(Poco::Exception& e)
395  {
396  applyError(e.message(), e.code());
397  if (needDefaultJson)
398  resultData = makeDefaultJSON(message(message_const::ERROR, getErrorMsg()));
399  Poco::Logger::root().log(e);
400  }
401  catch(std::exception& e)
402  {
403  applyError(e.what(), COMMAND_ERROR);
404  if (needDefaultJson)
405  resultData = makeDefaultJSON(message(message_const::ERROR, getErrorMsg()));
406  Poco::Logger::root().error(e.what());
407  }
408 
409  std::string resJson;
410  gettimeofday(&tvStop, 0);
411 
412  SphinxOutputJsonMessage outputMessage;
413  outputMessage.setErrorCode(getErrorCode());
414  outputMessage.setErrorMessage(getErrorMsg());
415  outputMessage.setData(resultData);
416  outputMessage.setTime(SphinxSearcher::getTimeInterval(tvStart, tvStop)/1000);
417  outputMessage.serialize(resJson);
418 
419  if (outputMessage.isError())
420  log(LoggerStream::Priority::PRIO_DEBUG) << message(message_const::SERIALIZE_OUTPUT_JSON_MESSAGE)
421  << message(message_const::ERROR_MSG, outputMessage.getErrorMsg())
422  << message(message_const::ERROR_CODE, outputMessage.getErrorCode()) << flush;
423 
424  return resJson;
425 }
426 //-----------------------------------------------------------------------------
428 {
429  _isError = false;
430  errorMsg.clear();
432  std::string resultData;
433  timeval tvStart, tvStop;
434  gettimeofday(&tvStart, 0); //gettimeofday does not support TZ adjust on Linux.
435  bool needDefaultJson = false;
436  try
437  {
438  if (inputJsonMessage.getType() == SphinxInputJsonMessage::MessageType::mtSearch)
439  {
440  needDefaultJson = true;
441  resultData = searcher.makeSearchCommand(inputJsonMessage.getData());
442  if (resultData.empty())
443  resultData = makeDefaultJSON(message(message_const::EMPTY_RESULT_DATA));
444  }
445  else if (inputJsonMessage.getType() == SphinxInputJsonMessage::MessageType::mtIndex)
446  {
447  indexer.makeIndexCommand(inputJsonMessage.getData());
448  }
449  else if (inputJsonMessage.getType() == SphinxInputJsonMessage::MessageType::mtManage)
450  {
451  indexer.makeAdminCommand(inputJsonMessage.getData(), resultData);
452  }
453  else
454  throw Poco::Exception(message(message_const::INCORRECT_JSON_TYPE), COMMAND_ERROR);
455  }
456  catch(Poco::Exception& e)
457  {
458  applyError(e.message(), e.code());
459  if (needDefaultJson)
460  resultData = makeDefaultJSON(message(message_const::ERROR, getErrorMsg()));
461  Poco::Logger::root().log(e);
462  }
463  catch(std::exception& e)
464  {
465  applyError(e.what(), COMMAND_ERROR);
466  if (needDefaultJson)
467  resultData = makeDefaultJSON(message(message_const::ERROR, getErrorMsg()));
468  Poco::Logger::root().error(e.what());
469  }
470 
471  gettimeofday(&tvStop, 0);
472 
473  SphinxOutputJsonMessage outputMessage;
474  outputMessage.setErrorCode(getErrorCode());
475  outputMessage.setErrorMessage(getErrorMsg());
476  outputMessage.setData(resultData);
477  outputMessage.setTime(SphinxSearcher::getTimeInterval(tvStart, tvStop)/1000);
478 
479  return outputMessage;
480 }
481 //-----------------------------------------------------------------------------
482 std::string SphinxFunctionalObject::makeDefaultJSON(const std::string& logMsg)
483 {
484  log(LoggerStream::Priority::PRIO_DEBUG) << logMsg << " " << message(message_const::GENERATE_DEFAULT_JSON) << flush;
485  SphinxDefaultJSON defaultJson;
486  return defaultJson.getJSON();
487 }
488 //-----------------------------------------------------------------------------
489 std::string SphinxFunctionalObject::makeErrorMessage(const std::string& inputMessage, unsigned int errorCode, const std::string& errorMessage)
490 {
491  std::string resultMessageBody;
492  HCE::sphinx::SphinxOutputJsonMessage outputJsonMessage;
493  HCE::sphinx::SphinxInputJsonMessage inputJsonMessage(inputMessage);
494  if ((inputJsonMessage.getType() == HCE::sphinx::SphinxInputJsonMessage::MessageType::mtSearch) && !inputJsonMessage.isError())
495  {
496  unsigned int queryId = 0;
497  HCE::sphinx::SphinxInputJsonMessageSearch messageSearch(inputJsonMessage.getData());
498  for (size_t k=0;k<messageSearch.getQueryParameters().size();++k)
499  {
501  {
502  std::istringstream(messageSearch.getQueryParameters()[k].second) >> queryId;
503  break;
504  }
505  }
506  if (queryId)
507  {
509  requestInfo.setQueryId(queryId);
511  resultData.addRequestInfo(std::forward<HCE::sphinx::SphinxRequestInfo>(requestInfo));
512 
513  HCE::sphinx::SphinxResultDataSerializator resultSerializator(resultData);
514  if (resultSerializator.serialize(resultMessageBody))
515  outputJsonMessage.setData(resultMessageBody);
516  }
517  }
518 
519  outputJsonMessage.setErrorCode(errorCode);
520  outputJsonMessage.setErrorMessage(errorMessage);
521 
522  if (!outputJsonMessage.serialize(resultMessageBody))
523  {
524  std::stringstream errMsg;
525  errMsg << message_const::serializationOutputJsonMessageError << ": " << outputJsonMessage.getErrorMsg();
526  Poco::Logger::root().log(Poco::Message(sphinx_search_const::moduleName, errMsg.str(), Poco::Message::Priority::PRIO_ERROR));
527  }
528  return resultMessageBody;
529 }
530 //-----------------------------------------------------------------------------
531 //-----------------------------------------------------------------------------
532 } // namespace sphinx
533 } // namespace HCE