Stack Overflow Asked by Molly Birk on December 20, 2021
This is a follow on from a previous question here – I received some wonderful advice that helped me move my code along. For the next piece of the puzzle, I figured it warranted a new post – I hope that’s okay.
I have some code that creates requests in a main loop, to read from or write to a file and executes each request in its own thread. With the help I got from the earlier post, I was able to extend my code to add a request queue with multiple entries and read/write functions that merely sleep for a short time to emulate file access.
Now I want to actually learn how to read and write to/from the files when there can potentially one or more threads trying to read and/or write the same file at the same time.
To make this easier to test, I limit the file to a single instance otherwise I need to consider the cases where the file doesn’t exist etc. In the real application, there will be several hundred files in play but my limited understanding suggests that if I can make the locking mechanism work for a single file, it’ll work when there are many.
I am still trying improve my understanding of threading and first tried adding an std::mutex
with a global lock variable in the read_file()
& write_file()
functions but I got into a terrible mess.
Can someone please point me at the correct approach I should investigate to make this work in a robust fashion.
#include <fstream>
#include <future>
#include <iostream>
#include <sstream>
#include <iomanip>
#include <string>
#include <random>
std::vector< std::future<std::string> > requests;
int random_int(int start, int end)
{
std::random_device rd;
std::mt19937 generator(rd());
std::uniform_int_distribution<> distrib(start, end);
return distrib(generator);
}
const std::string generate_filename()
{
std::ostringstream filename;
// use a single file for testing
//filename << "file_" << std::setfill('0') << std::setw(2) << random_int(1, 20) << ".txt";
filename << "file.txt";
return filename.str();
}
std::string write_file(const std::string filename)
{
std::cout << "write_file: filename is " << filename << std::endl;
// slow things down so i can follow
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
std::ofstream ofs(filename);
if (!ofs)
{
return std::string("ERROR");
}
const char chr = 'A' + random_int(0, 25);
for (int i = 0; i < 64; ++i)
{
ofs << chr;
}
ofs << std::endl;
ofs.close();
std::cout << "write_file: written to " << filename << std::endl;
return std::string("WRITTEN");
}
std::string read_file(const std::string filename)
{
std::cout << "read_file: filename is " << filename << std::endl;
// slow things down so i can follow
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
std::ifstream ifs(filename);
if (!ifs.is_open())
{
return std::string("ERROR OPEINING FILE");
}
std::string contents;
if (std::getline(ifs, contents))
{
std::cout << " read_file: read from " << filename << std::endl;
return std::string(contents);
}
return std::string("ERROR READING CONTENTS");
}
void add_request()
{
// randomly add a read or a write request
if (random_int(1, 50) > 25)
requests.push_back(std::async(std::launch::async, write_file, generate_filename()));
else
requests.push_back(std::async(std::launch::async, read_file, generate_filename()));
}
int main(int argc, char* argv[])
{
int max_requests = 10;
// avoid falling out of the loop on first pass
add_request();
do {
std::cout << "working: requests in queue = " << requests.size() << std::endl;
// randomly add a request if we still have not added the max
if (random_int(1, 5) == 1)
{
if (--max_requests > 0)
{
add_request();
}
}
// service the future for each item in the request queue
for (auto iter = requests.begin(); iter != requests.end(); )
{
if ((*iter).wait_for(std::chrono::milliseconds(1)) == std::future_status::ready)
{
std::cout << "Request completed, removing it from the queue: result: " << (*iter).get() << std::endl;
iter = requests.erase(iter);
}
else
{
++iter;
}
}
// once the queue is empty we exit - in the real app, we do not
// and keep processing requests until the app exits normally
} while (requests.size() > 0);
}
As an alternative to what David Schwartz suggests, instead of keeping shared state and using a std::mutex
to guard it, you could instead make use of your operating system's ability to place locks on files. For example, on any UNIX-like operating system, you can use flock()
to lock a file, either in shared mode (to allow multiple concurrent readers), or exclusive mode (for a single writer). This would even allow multiple instances of your program to run, accessing the same files without stepping on each others toes. The drawback is that it is not portable, and even on platforms that support it there is no way to get the UNIX file descriptor from a std::ifstream
, so you would have to use the POSIX API to read and write files instead of functions from <iostream>
. However, since the locks are advisory, you could first call POSIX open()
on the file, lock it, and then create a std::ifstream
or std::ofstream
.
Another issue with multiple threads accessing the same file is that, even if you do proper locking, there is no guarantee in which order the threads are run. Perhaps it is better to not start all operations in parallel, but rather have a per-file queue of pending operations, and have only one thread per file processing these pending operations.
Answered by G. Sliepen on December 20, 2021
Here's the algorithm each thread should follow:
Note that if you use a condition variable to make the wait more efficient, then steps 6, 7 and 8 turn into waiting on the condition variable and then jumping to step 2. Also, you would need to broadcast the condition variable (notify all) before or after step 14. (Preferably before.)
Answered by David Schwartz on December 20, 2021
Get help from others!
Recent Questions
Recent Answers
© 2024 TransWikia.com. All rights reserved. Sites we Love: PCI Database, UKBizDB, Menu Kuliner, Sharing RPP