Commit 61ff18b6 authored by Roland Haas's avatar Roland Haas
Browse files

async_writer: a simple async IO class

parent f38b431b
#include "async_writer.hh"
#include <cassert>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>
async_writer::async_writer(FILE* out_fh, const size_t max_bytes_queued_) :
bytes_queued(0), fh(out_fh), max_bytes_queued(max_bytes_queued_)
{
int bytes_mutex_init = pthread_mutex_init(&bytes_lock, NULL);
int bytes_cond_init = pthread_cond_init(&bytes_wait, NULL);
assert(bytes_mutex_init == 0 && bytes_cond_init == 0);
int queue_mutex_init = pthread_mutex_init(&queue_lock, NULL);
int queue_cond_init = pthread_cond_init(&queue_wait, NULL);
assert(queue_mutex_init == 0 && queue_cond_init == 0);
int thread_create = pthread_create(&writer_thread, NULL, writer_func,
static_cast<void*>(this));
assert(thread_create == 0);
};
async_writer::~async_writer()
{
// make writer thread exit
cmd_block_t exit_cmd;
exit_cmd.exit_block.cmd = CMD_EXIT;
pthread_mutex_lock(&queue_lock);
cmd_queue.push(exit_cmd);
pthread_cond_signal(&queue_wait);
pthread_mutex_unlock(&queue_lock);
// this waits for writer thread to finish
int join = pthread_join(writer_thread, NULL);
assert(join == 0);
}
void async_writer::write(const void* buf, size_t count)
{
cmd_block_t write_cmd;
write_cmd.write_block.cmd = CMD_WRITE;
write_cmd.write_block.count = count;
write_cmd.write_block.buf = buf;
pthread_mutex_lock(&bytes_lock);
while(bytes_queued >= max_bytes_queued)
pthread_cond_wait(&bytes_wait, &bytes_lock);
pthread_mutex_lock(&queue_lock);
bytes_queued += count;
cmd_queue.push(write_cmd);
pthread_cond_signal(&queue_wait);
pthread_mutex_unlock(&queue_lock);
pthread_mutex_unlock(&bytes_lock);
}
void async_writer::seek(long offset)
{
cmd_block_t seek_cmd;
seek_cmd.seek_block.cmd = CMD_SEEK;
seek_cmd.seek_block.offset = offset;
pthread_mutex_lock(&queue_lock);
cmd_queue.push(seek_cmd);
pthread_cond_signal(&queue_wait);
pthread_mutex_unlock(&queue_lock);
}
void* async_writer::writer_func(void* calldata)
{
async_writer* obj = static_cast<async_writer*>(calldata);
obj->writer();
return NULL;
}
void async_writer::writer()
{
bool done = false;
while(!done) {
pthread_mutex_lock(&queue_lock);
while(cmd_queue.empty())
pthread_cond_wait(&queue_wait, &queue_lock);
cmd_block_t cmd_block = cmd_queue.front();
cmd_queue.pop();
pthread_mutex_unlock(&queue_lock);
switch(cmd_block.cmd_hdr.cmd) {
case CMD_WRITE: {
size_t written =
fwrite(cmd_block.write_block.buf, 1, cmd_block.write_block.count, fh);
assert(written == cmd_block.write_block.count);
free(const_cast<void*>(cmd_block.write_block.buf));
pthread_mutex_lock(&bytes_lock);
bytes_queued -= written;
pthread_cond_signal(&bytes_wait);
pthread_mutex_unlock(&bytes_lock);
} break;
case CMD_SEEK: {
int seeked = fseek(fh, cmd_block.seek_block.offset, SEEK_SET);
assert(seeked == 0);
} break;
case CMD_EXIT: {
// not really required but keeps writes here rather than in caller
int flushed = fflush(fh);
assert(flushed == 0);
done = true;
} break;
default:
assert(0);
break;
}
}
}
#ifndef ASYNC_WRITER_HH_
#define ASYNC_WRITER_HH_
#include <cstddef>
#include <cstdio>
#include <pthread.h>
#include <queue>
// a class to provide asynchronous IO to a file
// it is the caller's responsibility to fopen() the file, fclose() it, and to
// malloc() data for write()
class async_writer
{
public:
async_writer(FILE* out_fh, size_t max_bytes_queued_ = 1000000000);
~async_writer();
private:
// no copying or assignment
async_writer(const async_writer& other);
async_writer& operator=(const async_writer& other);
public:
// write buf to file, free() memory once done ie. buf must be obtained from
// malloc()
void write(const void* buf, size_t count);
void seek(long offset);
private:
// data types to keep track of what the writer end needs to do
enum cmd_name {CMD_INVALID = 0, CMD_WRITE, CMD_SEEK, CMD_EXIT, CMD_END};
union cmd_block_t {
struct cmd_hdr_t {
cmd_name cmd;
} cmd_hdr;
struct write_block_t {
cmd_name cmd;
const void *buf;
size_t count;
} write_block;
struct seek_block_t {
cmd_name cmd;
long offset;
} seek_block;
struct exit_block_t {
cmd_name cmd;
} exit_block;
};
typedef std::queue<cmd_block_t> cmd_queue_t;
cmd_queue_t cmd_queue;
size_t bytes_queued;
// the output file
FILE* fh;
// write() blocks until there are no more than max_bytes_queued bytes queued up
const size_t max_bytes_queued;
// threading support
pthread_t writer_thread;
// if holding both mutexes then bytes_lock must be aquired first!
// protects access to bytes_queued variable and signals when it is reduced
pthread_mutex_t bytes_lock;
pthread_cond_t bytes_wait;
// protects access to queue and signals when a new work item is added
pthread_mutex_t queue_lock;
pthread_cond_t queue_wait;
// these two functions implement the writer thread. The static one is
// passed to pthread_create and wraps the member function
static void* writer_func(void* calldata);
void writer();
};
#endif // ASYNC_WRITER_HH_
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment