|
@@ -27,10 +27,6 @@ DBDirectoryProcessor::DBDirectoryProcessor() : filesSplitting(false) {
|
|
|
directory=std::string(DEFAULT_DIR_NAME);
|
|
|
maxFileBytesize=0;
|
|
|
|
|
|
- // Create the pool of ifstream
|
|
|
- for(int i=0;i<NB_FILE_DESCRIPTORS;i++)
|
|
|
- fdPool.push_back(new std::ifstream());
|
|
|
-
|
|
|
// Then create the catalog and get the filenumbers and size
|
|
|
DIR *dir = opendir (directory.c_str());
|
|
|
struct dirent *ent = nullptr;
|
|
@@ -78,10 +74,6 @@ DBDirectoryProcessor::DBDirectoryProcessor(uint64_t nbStreams) : filesSplitting(
|
|
|
directory=std::string(DEFAULT_DIR_NAME);
|
|
|
maxFileBytesize=0;
|
|
|
|
|
|
- // Create the pool of ifstream
|
|
|
- for(int i=0;i<NB_FILE_DESCRIPTORS;i++)
|
|
|
- fdPool.push_back(new std::ifstream());
|
|
|
-
|
|
|
// Then create the catalog and get the filenumbers and size
|
|
|
DIR *dir = opendir (directory.c_str());
|
|
|
struct dirent *ent = nullptr;
|
|
@@ -129,7 +121,9 @@ DBDirectoryProcessor::DBDirectoryProcessor(uint64_t nbStreams) : filesSplitting(
|
|
|
}
|
|
|
|
|
|
DBDirectoryProcessor::~DBDirectoryProcessor() {
|
|
|
- for (auto ifs : fdPool) delete ifs;
|
|
|
+ for(auto it : fdPool) {
|
|
|
+ delete it.second;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
std::string DBDirectoryProcessor::getCatalog(const bool typeOfCatalog) {
|
|
@@ -170,11 +164,13 @@ bool DBDirectoryProcessor::getErrorStatus() {
|
|
|
return error;
|
|
|
}
|
|
|
|
|
|
-std::ifstream* DBDirectoryProcessor::openStream(uint64_t streamNb, uint64_t requested_offset) {
|
|
|
+bool DBDirectoryProcessor::openStream(uint64_t streamNb, uint64_t requested_offset) {
|
|
|
+ if(fdPool.count(streamNb)) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
std::string local_directory(DEFAULT_DIR_NAME);
|
|
|
|
|
|
- std::ifstream* is = fdPool.back();
|
|
|
- fdPool.pop_back();
|
|
|
+ std::ifstream* is = new std::ifstream();
|
|
|
// When there is no splitting, each ifstream is associated with a real file
|
|
|
// (at least when no aggregation is done which is the case for now)
|
|
|
if(!filesSplitting) {
|
|
@@ -186,10 +182,12 @@ std::ifstream* DBDirectoryProcessor::openStream(uint64_t streamNb, uint64_t requ
|
|
|
is->open( realFileName, std::ios::binary );
|
|
|
is->seekg(splitting_offset + requested_offset);
|
|
|
}
|
|
|
- return is;
|
|
|
+ fdPool.insert( std::pair<uint64_t, std::ifstream*>(streamNb, is));
|
|
|
+ return true;
|
|
|
}
|
|
|
|
|
|
-uint64_t DBDirectoryProcessor::readStream(std::ifstream* s, char * buf, uint64_t size) {
|
|
|
+uint64_t DBDirectoryProcessor::readStream(uint64_t streamNb, char * buf, uint64_t size) {
|
|
|
+ std::ifstream *s = fdPool[streamNb];
|
|
|
uint64_t sizeRead=0;
|
|
|
//std::cout << "sizeRead = "<<sizeRead<<" size = "<<size<<std::endl;
|
|
|
while(sizeRead<size) {
|
|
@@ -205,9 +203,13 @@ uint64_t DBDirectoryProcessor::readStream(std::ifstream* s, char * buf, uint64_t
|
|
|
return size;
|
|
|
}
|
|
|
|
|
|
-void DBDirectoryProcessor::closeStream(std::ifstream* s) {
|
|
|
- s->close();
|
|
|
- fdPool.push_back(s);
|
|
|
+void DBDirectoryProcessor::closeStream(uint64_t streamNb) {
|
|
|
+ if(!fdPool.count(streamNb)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ std::map<uint64_t, std::ifstream*>::iterator it = fdPool.find(streamNb);
|
|
|
+ it->second->close();
|
|
|
+ fdPool.erase(it);
|
|
|
}
|
|
|
|
|
|
std::streampos DBDirectoryProcessor::getFileSize( std::string filePath ){
|
|
@@ -230,12 +232,12 @@ void DBDirectoryProcessor::readAggregatedStream(uint64_t streamNb, uint64_t alph
|
|
|
{
|
|
|
for (int i=startStream; i <= endStream; i++)
|
|
|
{
|
|
|
- std::ifstream *stream = openStream(i, offset);
|
|
|
+ openStream(i, offset);
|
|
|
|
|
|
// Just read the file (plus padding for that file)
|
|
|
- readStream(stream, rawBits + (i % alpha) * fileByteSize, fileByteSize);
|
|
|
+ readStream(i, rawBits + (i % alpha) * fileByteSize, fileByteSize);
|
|
|
|
|
|
- closeStream(stream);
|
|
|
+ closeStream(i);
|
|
|
}
|
|
|
|
|
|
if(paddingStreams !=0)
|