hce-node application  1.4.3
HCE Hierarchical Cluster Engine node application
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
DRCETaskReducersManager.cpp
Go to the documentation of this file.
2 #include "Exceptions.hpp"
4 
8 
9 namespace HCE {
10 namespace drce{
11 namespace reduce_task {
12 
13 
15 
17 {
18  return HCE::types::MessageType::mtDrce;
19 }
20 
21 // cppcheck-suppress unusedFunction
22 bool DRCETaskReducersManager::isReducerExist(unsigned long long reduceTaskId)
23 {
24  return taskReduceResults_.find( reduceTaskId ) != taskReduceResults_.end();
25 }
26 
27 void DRCETaskReducersManager::createReducer(unsigned long long reduceTaskId)
28 {
29  auto iter = taskReduceResults_.find( reduceTaskId );
30 
31  if( iter == taskReduceResults_.end() ) {
32  taskReduceResults_[ reduceTaskId ] = new DRCEResultData();
33  }
34 }
35 
36 void DRCETaskReducersManager::addDataInReducer(unsigned long long reduceTaskId, const std::string& json)
37 {
38  auto iter = taskReduceResults_.find( reduceTaskId );
39  if( iter == taskReduceResults_.end() ) {
40  throw NotFoundByKeyException( "Not exist task id " + std::to_string( reduceTaskId ) );
41  }
42 
43  DRCEResultData tmpResult;
44 
45  DRCEResultDataSerializator resultSerializator( tmpResult );
46 
47  if( resultSerializator.unserialize( json ) ) {
48 
49  Poco::SharedPtr< DRCEResultData > resultData = iter->second;
50 
51  for( size_t i = 0, n = tmpResult.getItemsCount(); i < n; i++ ) {
52  resultData->addDataItem( tmpResult.getDataItem( i ) );
53  }
54  } else {
55  throw WrongJSONStructureException( resultSerializator.getErrorMsg() );
56  }
57 }
58 
59 std::string DRCETaskReducersManager::runReduceTaskForTaskId(unsigned long long reduceTaskId)
60 {
61  auto iter = taskReduceResults_.find( reduceTaskId );
62 
63  if( iter == taskReduceResults_.end() ) {
64  throw NotFoundByKeyException( "Not exist task id " + std::to_string( reduceTaskId ) );
65  }
66 
67  Poco::SharedPtr< DRCEResultData > resultData = iter->second;
68 
69  DRCEResultDataSerializator resultSerializator( *resultData );
70  std::string json;
71 
72  if( resultSerializator.serialize( json ) ) {
73  return json;
74  }
75 
76  throw NotFoundByKeyException( resultSerializator.getErrorMsg() );
77 }
78 
79 void DRCETaskReducersManager::deleteReducerBy(unsigned long long reduceTaskId)
80 {
81  if( taskReduceResults_.erase( reduceTaskId ) != 1 ) {
82  throw NotFoundByKeyException( "Not exist task id " + std::to_string( reduceTaskId ) );
83  }
84 }
85 
86 }
87 }
88 }