DBDirectoryProcessor.cpp 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. /* Copyright (C) 2014 Carlos Aguilar Melchor, Joris Barrier, Marc-Olivier Killijian
  2. * This file is part of XPIR.
  3. *
  4. * XPIR is free software: you can redistribute it and/or modify
  5. * it under the terms of the GNU General Public License as published by
  6. * the Free Software Foundation, either version 3 of the License, or
  7. * (at your option) any later version.
  8. *
  9. * XPIR is distributed in the hope that it will be useful,
  10. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. * GNU General Public License for more details.
  13. *
  14. * You should have received a copy of the GNU General Public License
  15. * along with XPIR. If not, see <http://www.gnu.org/licenses/>.
  16. */
  17. #include "DBDirectoryProcessor.hpp"
  18. /************************************************/
  19. /* Default constructor : no splitting */
  20. /* -> 1 input file -> 1 output stream */
  21. /************************************************/
  22. DBDirectoryProcessor::DBDirectoryProcessor() : filesSplitting(false) {
  23. // TODO(feature) deal with sub-directories in the database !
  24. directory=std::string(DEFAULT_DIR_NAME);
  25. maxFileBytesize=0;
  26. // Create the pool of ifstream
  27. for(int i=0;i<NB_FILE_DESCRIPTORS;i++)
  28. fdPool.push_back(new std::ifstream());
  29. // Then create the catalog and get the filenumbers and size
  30. DIR *dir = opendir (directory.c_str());
  31. struct dirent *ent = nullptr;
  32. // If there is no error when opening the directory
  33. if (dir != NULL)
  34. {
  35. uint32_t i=0;
  36. // For each entry
  37. while ( (ent = readdir (dir)) != NULL)
  38. {
  39. // Ignore files . and ..
  40. if (strcmp(ent->d_name, ".") > 0 && strcmp(ent->d_name, ".."))
  41. {
  42. // Count processed files (one out of 2**7?)
  43. if ((i << 25)==0) std::cout << "DBDirectoryProcessor: " << i+1 << " entries processed\r" << std::flush;i++;
  44. // Add File object on the file list
  45. std::string fileName= std::string( ent->d_name );
  46. file_list.push_back( fileName );
  47. uint64_t fileSize = getFileSize(directory + fileName);
  48. if (fileSize > maxFileBytesize)
  49. maxFileBytesize = fileSize;
  50. }
  51. }
  52. std::cout << "DBDirectoryProcessor: " << i << " entries processed" << std::endl;
  53. if (i==0) {
  54. std::cout <<"DBDirectoryProcessor: No entries in the database" << std::endl;
  55. error = true;
  56. }
  57. closedir (dir);
  58. }
  59. else // If there was a problem opening the directory
  60. {
  61. std::cout << "DBDirectoryProcessor: Error opening database directory" << std::endl;
  62. error = true;
  63. }
  64. std::cout << "DBDirectoryProcessor: The size of the database is " << maxFileBytesize*file_list.size() << " bytes" << std::endl;
  65. std::cout << "DBDirectoryProcessor: The number of elements in the catalog is " << file_list.size() << std::endl;
  66. }
  67. // This constructor is called when we need File-splitting
  68. DBDirectoryProcessor::DBDirectoryProcessor(uint64_t nbStreams) : filesSplitting(true) {
  69. directory=std::string(DEFAULT_DIR_NAME);
  70. maxFileBytesize=0;
  71. // Create the pool of ifstream
  72. for(int i=0;i<NB_FILE_DESCRIPTORS;i++)
  73. fdPool.push_back(new std::ifstream());
  74. // Then create the catalog and get the filenumbers and size
  75. DIR *dir = opendir (directory.c_str());
  76. struct dirent *ent = nullptr;
  77. // If there is no error when opening the directory
  78. if (dir != NULL)
  79. {
  80. ent = readdir (dir);
  81. // WARNING: In case of file-splitting, we deal only with the first file
  82. // On some filesystems, the dir contains also special files such as "." and "..", skip them
  83. while (ent->d_name == NULL || ent->d_type != DT_REG) {
  84. ent = readdir (dir);
  85. }
  86. // Add File object on the file list
  87. std::string fileName=directory + std::string( ent->d_name );
  88. realFileName=fileName;
  89. uint64_t realFileSize = getFileSize(realFileName);
  90. maxFileBytesize = realFileSize/nbStreams;
  91. if(maxFileBytesize==0) {
  92. std::cout << "DBDirectoryProcessor: ERROR cannot split a file en less than one byte elements!" << std::endl;
  93. std::cout << "DBDirectoryProcessor: file " << realFileName << " is only "<< realFileSize << " long" << std::endl;
  94. error = true;
  95. }
  96. closedir (dir);
  97. for(int i=0;i<nbStreams;i++) {
  98. file_list.push_back( std::to_string(i) );
  99. }
  100. }
  101. else // If there was a problem opening the directory
  102. {
  103. std::cout << "DBDirectoryProcessor: Error when opening directory " <<directory<< std::endl;
  104. error = true;
  105. }
  106. #ifdef DEBUG
  107. std::cout << "maxFileBytesize." <<maxFileBytesize<< std::endl;
  108. std::cout << "file_list.size()." <<file_list.size()<< std::endl;
  109. #endif
  110. std::cout << "DBDirectoryProcessor: The size of the database is " << maxFileBytesize*file_list.size() << " bytes" << std::endl;
  111. std::cout << "DBDirectoryProcessor: The number of elements in the catalog is " << file_list.size() << std::endl;
  112. }
  113. DBDirectoryProcessor::~DBDirectoryProcessor() {
  114. for (auto ifs : fdPool) delete ifs;
  115. }
  116. std::string DBDirectoryProcessor::getCatalog(const bool typeOfCatalog) {
  117. std::string buf;
  118. directory=std::string(DEFAULT_DIR_NAME);
  119. if(typeOfCatalog) {
  120. // Start with the number of elements in the catalog
  121. buf = std::to_string((unsigned int)0)+ "\n";
  122. buf += std::to_string(getNbStream())+ "\n";
  123. // Then for each file contactenate (with newlines) filename and filesize
  124. for (auto f : file_list)
  125. {
  126. if(!filesSplitting) {
  127. buf += f + "\n" + std::to_string(getFileSize(directory+f)) + "\n";
  128. } else {
  129. buf += f + "\n" + std::to_string(getmaxFileBytesize()) + "\n";
  130. }
  131. }
  132. return buf;
  133. }
  134. // else we want a compact representation, i.e. nbFiles / fileSize
  135. buf = std::to_string((unsigned int)1)+ "\n";
  136. buf += std::to_string(getNbStream())+ "\n";
  137. buf += std::to_string(maxFileBytesize)+ "\n";
  138. return buf;
  139. }
  140. uint64_t DBDirectoryProcessor::getDBSizeBits() {
  141. return maxFileBytesize*file_list.size()*8;
  142. }
  143. uint64_t DBDirectoryProcessor::getNbStream() {
  144. return file_list.size();
  145. }
  146. uint64_t DBDirectoryProcessor::getmaxFileBytesize() {
  147. return maxFileBytesize;
  148. }
  149. bool DBDirectoryProcessor::getErrorStatus() {
  150. return error;
  151. }
  152. std::ifstream* DBDirectoryProcessor::openStream(uint64_t streamNb, uint64_t requested_offset) {
  153. std::string local_directory(DEFAULT_DIR_NAME);
  154. std::ifstream* is = fdPool.back();
  155. fdPool.pop_back();
  156. // When there is no splitting, each ifstream is associated with a real file
  157. // (at least when no aggregation is done which is the case for now)
  158. if(!filesSplitting) {
  159. is->open( local_directory + file_list[streamNb], std::ios::binary );
  160. is->seekg(requested_offset);
  161. } else {
  162. // But when we are doing file splitting, we just need to position the ifstream at the correct position
  163. uint64_t splitting_offset=streamNb*getmaxFileBytesize();
  164. is->open( realFileName, std::ios::binary );
  165. is->seekg(splitting_offset + requested_offset);
  166. }
  167. return is;
  168. }
  169. uint64_t DBDirectoryProcessor::readStream(std::ifstream* s, char * buf, uint64_t size) {
  170. uint64_t sizeRead=0;
  171. //std::cout << "sizeRead = "<<sizeRead<<" size = "<<size<<std::endl;
  172. while(sizeRead<size) {
  173. uint64_t readThisrun=s->readsome(buf+sizeRead,size-sizeRead);
  174. sizeRead+=readThisrun;
  175. // Check if we need to pad
  176. if(readThisrun==0 && sizeRead<size) {
  177. // std::cout << "padding = "<<size-sizeRead<<std::endl;
  178. bzero(buf+sizeRead,size-sizeRead);
  179. sizeRead=size;
  180. }
  181. }
  182. return size;
  183. }
  184. void DBDirectoryProcessor::closeStream(std::ifstream* s) {
  185. s->close();
  186. fdPool.push_back(s);
  187. }
  188. std::streampos DBDirectoryProcessor::getFileSize( std::string filePath ){
  189. std::streampos fsize = 0;
  190. std::ifstream file( filePath.c_str(), std::ios::binary );
  191. fsize = file.tellg();
  192. file.seekg( 0, std::ios::end );
  193. fsize = file.tellg() - fsize;
  194. file.close();
  195. return fsize;
  196. }
  197. void DBDirectoryProcessor::readAggregatedStream(uint64_t streamNb, uint64_t alpha, uint64_t offset, uint64_t bytes_per_file, char* rawBits){
  198. uint64_t fileByteSize = std::min(bytes_per_file, getmaxFileBytesize()-offset);
  199. uint64_t startStream = streamNb*alpha;
  200. uint64_t endStream = std::min(streamNb*alpha + alpha - 1, getNbStream() - 1);
  201. uint64_t paddingStreams = (streamNb*alpha+alpha) >= getNbStream() ? (streamNb*alpha+alpha) - getNbStream() : 0;
  202. #pragma omp critical
  203. {
  204. for (int i=startStream; i <= endStream; i++)
  205. {
  206. std::ifstream *stream = openStream(i, offset);
  207. // Just read the file (plus padding for that file)
  208. readStream(stream, rawBits + (i % alpha) * fileByteSize, fileByteSize);
  209. closeStream(stream);
  210. }
  211. if(paddingStreams !=0)
  212. {
  213. bzero(rawBits + (endStream % alpha) * fileByteSize, fileByteSize*paddingStreams);
  214. }
  215. }
  216. }