Patching Dec 9, 2021 6-7a CST- All GitLab services may be unavailable for 5-10 minutes

Commit 2630250c authored by Roland Haas's avatar Roland Haas
Browse files

async_writer: keep track of whether thread is active or not

parent 24e693ff
......@@ -9,7 +9,8 @@
#include <unistd.h>
async_writer::async_writer(FILE* out_fh, const size_t max_bytes_queued_) :
bytes_queued(0), fh(out_fh), max_bytes_queued(max_bytes_queued_)
bytes_queued(0), fh(out_fh), max_bytes_queued(max_bytes_queued_),
thread_active(false)
{
int bytes_mutex_init = pthread_mutex_init(&bytes_lock, NULL);
int bytes_cond_init = pthread_cond_init(&bytes_wait, NULL);
......@@ -32,15 +33,19 @@ async_writer::async_writer(FILE* out_fh, const size_t max_bytes_queued_) :
fprintf(stderr, "Failed to create writer thread: %d\n", thread_create);
exit(1);
}
thread_active = true;
};
async_writer::~async_writer()
{
finalize();
if(thread_active)
finalize();
}
void async_writer::write(const void* buf, size_t count)
{
assert(thread_active);
cmd_block_t write_cmd;
write_cmd.write_block.cmd = CMD_WRITE;
write_cmd.write_block.count = count;
......@@ -61,6 +66,8 @@ void async_writer::write(const void* buf, size_t count)
void async_writer::seek(long offset)
{
assert(thread_active);
cmd_block_t seek_cmd;
seek_cmd.seek_block.cmd = CMD_SEEK;
seek_cmd.seek_block.offset = offset;
......@@ -75,6 +82,9 @@ void async_writer::seek(long offset)
void async_writer::finalize()
{
if(!thread_active)
return;
// make writer thread exit
cmd_block_t exit_cmd;
exit_cmd.exit_block.cmd = CMD_EXIT;
......@@ -92,8 +102,24 @@ void async_writer::finalize()
fprintf(stderr, "Failed to join with writer thread: %d\n", join_ierr);
exit(1);
}
thread_active = false;
}
size_t async_writer::select(const size_t bytes_requested)
{
assert(bytes_requested <= max_bytes_queued);
pthread_mutex_lock(&bytes_lock);
while(max_bytes_queued - bytes_queued < bytes_requested)
pthread_cond_wait(&bytes_wait, &bytes_lock);
const size_t bytes_available = max_bytes_queued - bytes_queued;
pthread_mutex_unlock(&bytes_lock);
return bytes_available;
}
void* async_writer::writer_func(void* calldata)
{
async_writer* obj = static_cast<async_writer*>(calldata);
......
......@@ -27,6 +27,10 @@ class async_writer
void seek(long offset);
// flush command queue and wait for writer to finish
void finalize();
// waits until there are at least bytes_available bytes to add to the queue,
// returns how many bytes are actually available to be written without
// blocking. It is an error to ask for more bytes than max_bytes_queued.
size_t select(const size_t bytes_available);
private:
// data types to keep track of what the writer end needs to do
......@@ -66,6 +70,8 @@ class async_writer
// protects access to queue and signals when a new work item is added
pthread_mutex_t queue_lock;
pthread_cond_t queue_wait;
bool thread_active;
// these two functions implement the writer thread. The static one is
// passed to pthread_create and wraps the member function
......
......@@ -11,6 +11,7 @@ int main(void)
writer.write(strdup("foo\n"), 4);
writer.seek(1);
writer.write(strdup("aa"), 2);
writer.finalize();
}
return 0;
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment