DBDirectoryProcessor.cpp 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. /* Copyright (C) 2014 Carlos Aguilar Melchor, Joris Barrier, Marc-Olivier Killijian
  2. * This file is part of XPIR.
  3. *
  4. * XPIR is free software: you can redistribute it and/or modify
  5. * it under the terms of the GNU General Public License as published by
  6. * the Free Software Foundation, either version 3 of the License, or
  7. * (at your option) any later version.
  8. *
  9. * XPIR is distributed in the hope that it will be useful,
  10. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. * GNU General Public License for more details.
  13. *
  14. * You should have received a copy of the GNU General Public License
  15. * along with XPIR. If not, see <http://www.gnu.org/licenses/>.
  16. */
  17. #include "DBDirectoryProcessor.hpp"
  18. /************************************************/
  19. /* Default constructor : no splitting */
  20. /* -> 1 input file -> 1 output stream */
  21. /************************************************/
  22. DBDirectoryProcessor::DBDirectoryProcessor() : filesSplitting(false) {
  23. // TODO(feature) deal with sub-directories in the database !
  24. directory=std::string(DEFAULT_DIR_NAME);
  25. maxFileBytesize=0;
  26. // Create the pool of ifstream
  27. for(int i=0;i<NB_FILE_DESCRIPTORS;i++)
  28. fdPool.push_back(new std::ifstream());
  29. // Then create the catalog and get the filenumbers and size
  30. DIR *dir = opendir (directory.c_str());
  31. struct dirent *ent = nullptr;
  32. // If there is no error when opening the directory
  33. if (dir != NULL)
  34. {
  35. uint32_t i=0;
  36. // For each entry
  37. while ( (ent = readdir (dir)) != NULL)
  38. {
  39. // Ignore files . and ..
  40. if (strcmp(ent->d_name, ".") > 0 && strcmp(ent->d_name, ".."))
  41. {
  42. // Count processed files (one out of 2**7?)
  43. if ((i << 25)==0) std::cout << "DBDirectoryProcessor: " << i+1 << " entries processed\r" << std::flush;i++;
  44. // Add File object on the file list
  45. std::string fileName= std::string( ent->d_name );
  46. file_list.push_back( fileName );
  47. uint64_t fileSize = getFileSize(directory + fileName);
  48. if (fileSize > maxFileBytesize)
  49. maxFileBytesize = fileSize;
  50. }
  51. }
  52. std::cout << "DBDirectoryProcessor: " << i << " entries processed" << std::endl;
  53. closedir (dir);
  54. }
  55. else // If there was a problem opening the directory
  56. {
  57. std::cout << "DBDirectoryProcessor: Error opening database directory" << std::endl;
  58. }
  59. std::cout << "DBDirectoryProcessor: The size of the database is " << maxFileBytesize*file_list.size() << " bytes" << std::endl;
  60. std::cout << "DBDirectoryProcessor: The number of elements in the catalog is " << file_list.size() << std::endl;
  61. }
  62. // This constructor is called when we need File-splitting
  63. DBDirectoryProcessor::DBDirectoryProcessor(uint64_t nbStreams) : filesSplitting(true) {
  64. directory=std::string(DEFAULT_DIR_NAME);
  65. maxFileBytesize=0;
  66. // Create the pool of ifstream
  67. for(int i=0;i<NB_FILE_DESCRIPTORS;i++)
  68. fdPool.push_back(new std::ifstream());
  69. // Then create the catalog and get the filenumbers and size
  70. DIR *dir = opendir (directory.c_str());
  71. struct dirent *ent = nullptr;
  72. // If there is no error when opening the directory
  73. if (dir != NULL)
  74. {
  75. ent = readdir (dir);
  76. // WARNING: In case of file-splitting, we deal only with the first file
  77. // On some filesystems, the dir contains also special files such as "." and "..", skip them
  78. while (ent->d_name == NULL || ent->d_type != DT_REG) {
  79. ent = readdir (dir);
  80. }
  81. // Add File object on the file list
  82. std::string fileName=directory + std::string( ent->d_name );
  83. realFileName=fileName;
  84. uint64_t realFileSize = getFileSize(realFileName);
  85. maxFileBytesize = realFileSize/nbStreams;
  86. if(maxFileBytesize==0) {
  87. std::cout << "DBDirectoryProcessor: ERROR cannot split a file en less than one byte elements!" << std::endl;
  88. std::cout << "DBDirectoryProcessor: file " << realFileName << " is only "<< realFileSize << " long" << std::endl;
  89. exit(1);
  90. }
  91. closedir (dir);
  92. for(int i=0;i<nbStreams;i++) {
  93. file_list.push_back( std::to_string(i) );
  94. }
  95. }
  96. else // If there was a problem opening the directory
  97. {
  98. std::cout << "DBDirectoryProcessor: Error when opening directory " <<directory<< std::endl;
  99. exit(1);
  100. }
  101. #ifdef DEBUG
  102. std::cout << "maxFileBytesize." <<maxFileBytesize<< std::endl;
  103. std::cout << "file_list.size()." <<file_list.size()<< std::endl;
  104. #endif
  105. std::cout << "DBDirectoryProcessor: The size of the database is " << maxFileBytesize*file_list.size() << " bytes" << std::endl;
  106. std::cout << "DBDirectoryProcessor: The number of elements in the catalog is " << file_list.size() << std::endl;
  107. }
  108. DBDirectoryProcessor::~DBDirectoryProcessor() {
  109. for (auto ifs : fdPool) delete ifs;
  110. }
  111. std::string DBDirectoryProcessor::getCatalog(const bool typeOfCatalog) {
  112. std::string buf;
  113. directory=std::string(DEFAULT_DIR_NAME);
  114. if(typeOfCatalog) {
  115. // Start with the number of elements in the catalog
  116. buf = std::to_string((unsigned int)0)+ "\n";
  117. buf += std::to_string(getNbStream())+ "\n";
  118. // Then for each file contactenate (with newlines) filename and filesize
  119. for (auto f : file_list)
  120. {
  121. if(!filesSplitting) {
  122. buf += f + "\n" + std::to_string(getFileSize(directory+f)) + "\n";
  123. } else {
  124. buf += f + "\n" + std::to_string(getmaxFileBytesize()) + "\n";
  125. }
  126. }
  127. return buf;
  128. }
  129. // else we want a compact representation, i.e. nbFiles / fileSize
  130. buf = std::to_string((unsigned int)1)+ "\n";
  131. buf += std::to_string(getNbStream())+ "\n";
  132. buf += std::to_string(maxFileBytesize)+ "\n";
  133. return buf;
  134. }
  135. uint64_t DBDirectoryProcessor::getDBSizeBits() {
  136. return maxFileBytesize*file_list.size()*8;
  137. }
  138. uint64_t DBDirectoryProcessor::getNbStream() {
  139. return file_list.size();
  140. }
  141. uint64_t DBDirectoryProcessor::getmaxFileBytesize() {
  142. return maxFileBytesize;
  143. }
  144. std::ifstream* DBDirectoryProcessor::openStream(uint64_t streamNb, uint64_t requested_offset) {
  145. std::string local_directory(DEFAULT_DIR_NAME);
  146. std::ifstream* is = fdPool.back();
  147. fdPool.pop_back();
  148. // When there is no splitting, each ifstream is associated with a real file
  149. // (at least when no aggregation is done which is the case for now)
  150. if(!filesSplitting) {
  151. is->open( local_directory + file_list[streamNb], std::ios::binary );
  152. is->seekg(requested_offset);
  153. } else {
  154. // But when we are doing file splitting, we just need to position the ifstream at the correct position
  155. uint64_t splitting_offset=streamNb*getmaxFileBytesize();
  156. is->open( realFileName, std::ios::binary );
  157. is->seekg(splitting_offset + requested_offset);
  158. }
  159. return is;
  160. }
  161. uint64_t DBDirectoryProcessor::readStream(std::ifstream* s, char * buf, uint64_t size) {
  162. uint64_t sizeRead=0;
  163. //std::cout << "sizeRead = "<<sizeRead<<" size = "<<size<<std::endl;
  164. while(sizeRead<size) {
  165. uint64_t readThisrun=s->readsome(buf+sizeRead,size-sizeRead);
  166. sizeRead+=readThisrun;
  167. // Check if we need to pad
  168. if(readThisrun==0 && sizeRead<size) {
  169. // std::cout << "padding = "<<size-sizeRead<<std::endl;
  170. bzero(buf+sizeRead,size-sizeRead);
  171. sizeRead=size;
  172. }
  173. }
  174. return size;
  175. }
  176. void DBDirectoryProcessor::closeStream(std::ifstream* s) {
  177. s->close();
  178. fdPool.push_back(s);
  179. }
  180. std::streampos DBDirectoryProcessor::getFileSize( std::string filePath ){
  181. std::streampos fsize = 0;
  182. std::ifstream file( filePath.c_str(), std::ios::binary );
  183. fsize = file.tellg();
  184. file.seekg( 0, std::ios::end );
  185. fsize = file.tellg() - fsize;
  186. file.close();
  187. return fsize;
  188. }
  189. void DBDirectoryProcessor::readAggregatedStream(uint64_t streamNb, uint64_t alpha, uint64_t offset, uint64_t bytes_per_file, char* rawBits){
  190. uint64_t fileByteSize = std::min(bytes_per_file, getmaxFileBytesize()-offset);
  191. uint64_t startStream = streamNb*alpha;
  192. uint64_t endStream = std::min(streamNb*alpha + alpha - 1, getNbStream() - 1);
  193. uint64_t paddingStreams = (streamNb*alpha+alpha) >= getNbStream() ? (streamNb*alpha+alpha) - getNbStream() : 0;
  194. #pragma omp critical
  195. {
  196. for (int i=startStream; i <= endStream; i++)
  197. {
  198. std::ifstream *stream = openStream(i, offset);
  199. // Just read the file (plus padding for that file)
  200. readStream(stream, rawBits + (i % alpha) * fileByteSize, fileByteSize);
  201. closeStream(stream);
  202. }
  203. if(paddingStreams !=0)
  204. {
  205. bzero(rawBits + (endStream % alpha) * fileByteSize, fileByteSize*paddingStreams);
  206. }
  207. }
  208. }