DBDirectoryProcessor.cpp 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. /* Copyright (C) 2014 Carlos Aguilar Melchor, Joris Barrier, Marc-Olivier Killijian
  2. * This file is part of XPIR.
  3. *
  4. * XPIR is free software: you can redistribute it and/or modify
  5. * it under the terms of the GNU General Public License as published by
  6. * the Free Software Foundation, either version 3 of the License, or
  7. * (at your option) any later version.
  8. *
  9. * XPIR is distributed in the hope that it will be useful,
  10. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. * GNU General Public License for more details.
  13. *
  14. * You should have received a copy of the GNU General Public License
  15. * along with XPIR. If not, see <http://www.gnu.org/licenses/>.
  16. */
  17. #include "DBDirectoryProcessor.hpp"
  18. /************************************************/
  19. /* Default constructor : no splitting */
  20. /* -> 1 input file -> 1 output stream */
  21. /************************************************/
  22. DBDirectoryProcessor::DBDirectoryProcessor() : filesSplitting(false) {
  23. // TODO(feature) deal with sub-directories in the database !
  24. directory=std::string(DEFAULT_DIR_NAME);
  25. maxFileBytesize=0;
  26. // Then create the catalog and get the filenumbers and size
  27. DIR *dir = opendir (directory.c_str());
  28. struct dirent *ent = nullptr;
  29. // If there is no error when opening the directory
  30. if (dir != NULL)
  31. {
  32. uint32_t i=0;
  33. // For each entry
  34. while ( (ent = readdir (dir)) != NULL)
  35. {
  36. // Ignore files . and ..
  37. if (strcmp(ent->d_name, ".") > 0 && strcmp(ent->d_name, ".."))
  38. {
  39. // Count processed files (one out of 2**7?)
  40. if ((i << 25)==0) std::cout << "DBDirectoryProcessor: " << i+1 << " entries processed\r" << std::flush;i++;
  41. // Add File object on the file list
  42. std::string fileName= std::string( ent->d_name );
  43. file_list.push_back( fileName );
  44. uint64_t fileSize = getFileSize(directory + fileName);
  45. if (fileSize > maxFileBytesize)
  46. maxFileBytesize = fileSize;
  47. }
  48. }
  49. std::cout << "DBDirectoryProcessor: " << i << " entries processed" << std::endl;
  50. if (i==0) {
  51. std::cout <<"DBDirectoryProcessor: No entries in the database" << std::endl;
  52. error = true;
  53. }
  54. closedir (dir);
  55. }
  56. else // If there was a problem opening the directory
  57. {
  58. std::cout << "DBDirectoryProcessor: Error opening database directory" << std::endl;
  59. error = true;
  60. }
  61. std::cout << "DBDirectoryProcessor: The size of the database is " << maxFileBytesize*file_list.size() << " bytes" << std::endl;
  62. std::cout << "DBDirectoryProcessor: The number of elements in the catalog is " << file_list.size() << std::endl;
  63. }
  64. // This constructor is called when we need File-splitting
  65. DBDirectoryProcessor::DBDirectoryProcessor(uint64_t nbStreams) : filesSplitting(true) {
  66. directory=std::string(DEFAULT_DIR_NAME);
  67. maxFileBytesize=0;
  68. // Then create the catalog and get the filenumbers and size
  69. DIR *dir = opendir (directory.c_str());
  70. struct dirent *ent = nullptr;
  71. // If there is no error when opening the directory
  72. if (dir != NULL)
  73. {
  74. ent = readdir (dir);
  75. // WARNING: In case of file-splitting, we deal only with the first file
  76. // On some filesystems, the dir contains also special files such as "." and "..", skip them
  77. while (ent->d_name == NULL || ent->d_type != DT_REG) {
  78. ent = readdir (dir);
  79. }
  80. // Add File object on the file list
  81. std::string fileName=directory + std::string( ent->d_name );
  82. realFileName=fileName;
  83. uint64_t realFileSize = getFileSize(realFileName);
  84. maxFileBytesize = realFileSize/nbStreams;
  85. if(maxFileBytesize==0) {
  86. std::cout << "DBDirectoryProcessor: ERROR cannot split a file en less than one byte elements!" << std::endl;
  87. std::cout << "DBDirectoryProcessor: file " << realFileName << " is only "<< realFileSize << " long" << std::endl;
  88. error = true;
  89. }
  90. closedir (dir);
  91. for(int i=0;i<nbStreams;i++) {
  92. file_list.push_back( std::to_string(i) );
  93. }
  94. }
  95. else // If there was a problem opening the directory
  96. {
  97. std::cout << "DBDirectoryProcessor: Error when opening directory " <<directory<< std::endl;
  98. error = true;
  99. }
  100. #ifdef DEBUG
  101. std::cout << "maxFileBytesize." <<maxFileBytesize<< std::endl;
  102. std::cout << "file_list.size()." <<file_list.size()<< std::endl;
  103. #endif
  104. std::cout << "DBDirectoryProcessor: The size of the database is " << maxFileBytesize*file_list.size() << " bytes" << std::endl;
  105. std::cout << "DBDirectoryProcessor: The number of elements in the catalog is " << file_list.size() << std::endl;
  106. }
  107. DBDirectoryProcessor::~DBDirectoryProcessor() {
  108. for(auto it : fdPool) {
  109. delete it.second;
  110. }
  111. }
  112. std::string DBDirectoryProcessor::getCatalog(const bool typeOfCatalog) {
  113. std::string buf;
  114. directory=std::string(DEFAULT_DIR_NAME);
  115. if(typeOfCatalog) {
  116. // Start with the number of elements in the catalog
  117. buf = std::to_string((unsigned int)0)+ "\n";
  118. buf += std::to_string(getNbStream())+ "\n";
  119. // Then for each file contactenate (with newlines) filename and filesize
  120. for (auto f : file_list)
  121. {
  122. if(!filesSplitting) {
  123. buf += f + "\n" + std::to_string(getFileSize(directory+f)) + "\n";
  124. } else {
  125. buf += f + "\n" + std::to_string(getmaxFileBytesize()) + "\n";
  126. }
  127. }
  128. return buf;
  129. }
  130. // else we want a compact representation, i.e. nbFiles / fileSize
  131. buf = std::to_string((unsigned int)1)+ "\n";
  132. buf += std::to_string(getNbStream())+ "\n";
  133. buf += std::to_string(maxFileBytesize)+ "\n";
  134. return buf;
  135. }
  136. uint64_t DBDirectoryProcessor::getDBSizeBits() {
  137. return maxFileBytesize*file_list.size()*8;
  138. }
  139. uint64_t DBDirectoryProcessor::getNbStream() {
  140. return file_list.size();
  141. }
  142. uint64_t DBDirectoryProcessor::getmaxFileBytesize() {
  143. return maxFileBytesize;
  144. }
  145. bool DBDirectoryProcessor::getErrorStatus() {
  146. return error;
  147. }
  148. bool DBDirectoryProcessor::openStream(uint64_t streamNb, uint64_t requested_offset) {
  149. if(fdPool.count(streamNb)) {
  150. return false;
  151. }
  152. std::string local_directory(DEFAULT_DIR_NAME);
  153. std::ifstream* is = new std::ifstream();
  154. // When there is no splitting, each ifstream is associated with a real file
  155. // (at least when no aggregation is done which is the case for now)
  156. if(!filesSplitting) {
  157. is->open( local_directory + file_list[streamNb], std::ios::binary );
  158. is->seekg(requested_offset);
  159. } else {
  160. // But when we are doing file splitting, we just need to position the ifstream at the correct position
  161. uint64_t splitting_offset=streamNb*getmaxFileBytesize();
  162. is->open( realFileName, std::ios::binary );
  163. is->seekg(splitting_offset + requested_offset);
  164. }
  165. fdPool.insert( std::pair<uint64_t, std::ifstream*>(streamNb, is));
  166. return true;
  167. }
  168. uint64_t DBDirectoryProcessor::readStream(uint64_t streamNb, char * buf, uint64_t size) {
  169. std::ifstream *s = fdPool[streamNb];
  170. uint64_t sizeRead=0;
  171. //std::cout << "sizeRead = "<<sizeRead<<" size = "<<size<<std::endl;
  172. while(sizeRead<size) {
  173. uint64_t readThisrun=s->readsome(buf+sizeRead,size-sizeRead);
  174. sizeRead+=readThisrun;
  175. // Check if we need to pad
  176. if(readThisrun==0 && sizeRead<size) {
  177. // std::cout << "padding = "<<size-sizeRead<<std::endl;
  178. bzero(buf+sizeRead,size-sizeRead);
  179. sizeRead=size;
  180. }
  181. }
  182. return size;
  183. }
  184. void DBDirectoryProcessor::closeStream(uint64_t streamNb) {
  185. if(!fdPool.count(streamNb)) {
  186. return;
  187. }
  188. std::map<uint64_t, std::ifstream*>::iterator it = fdPool.find(streamNb);
  189. it->second->close();
  190. fdPool.erase(it);
  191. }
  192. std::streampos DBDirectoryProcessor::getFileSize( std::string filePath ){
  193. std::streampos fsize = 0;
  194. std::ifstream file( filePath.c_str(), std::ios::binary );
  195. fsize = file.tellg();
  196. file.seekg( 0, std::ios::end );
  197. fsize = file.tellg() - fsize;
  198. file.close();
  199. return fsize;
  200. }