Commit 896cee63 authored by Roland Haas's avatar Roland Haas
Browse files

mpitar: use a fixed size ring buffer rather than queue

parent fc0addb1
......@@ -2,34 +2,31 @@
#include "async_writer.hh"
#include "timer.hh"
#include <algorithm>
#include <cassert>
#include <cstdlib>
#include <cstring>
#include <cerrno>
#include <sys/types.h>
#include <sys/uio.h>
#include <unistd.h>
#include <limits.h>
timer timer_write_worker_wait("write_worker_wait");
timer timer_write_worker_write("write_worker_write");
timer timer_write_master_wait("write_master_wait");
async_writer::async_writer(FILE* out_fh, const size_t max_bytes_queued_) :
bytes_queued(0), fh(out_fh), max_bytes_queued(max_bytes_queued_),
thread_active(false)
async_writer::async_writer(const int out_fd_, const size_t capacity_) :
out_fd(out_fd_), error_code(0), ring(capacity_), floor(0), height(0),
thread_active(false), do_exit(false)
{
int bytes_mutex_init = pthread_mutex_init(&bytes_lock, NULL);
int bytes_cond_init = pthread_cond_init(&bytes_wait, NULL);
if(bytes_mutex_init != 0 || bytes_cond_init != 0) {
fprintf(stderr, "Failed to initialize mutexes for bytes variable: %d %d\n",
bytes_mutex_init, bytes_cond_init);
exit(1);
}
int queue_mutex_init = pthread_mutex_init(&queue_lock, NULL);
int queue_cond_init = pthread_cond_init(&queue_wait, NULL);
if(queue_mutex_init != 0 || queue_cond_init != 0) {
fprintf(stderr, "Failed to initialize mutexes for queue variable: %d %d\n",
queue_mutex_init, queue_cond_init);
int ring_mutex_init = pthread_mutex_init(&ring_lock, NULL);
int ring_cond_init = pthread_cond_init(&ring_wait, NULL);
if(ring_mutex_init != 0 || ring_cond_init != 0) {
fprintf(stderr, "Failed to initialize mutexes for ring variable: %d %d\n",
ring_mutex_init, ring_cond_init);
exit(1);
}
......@@ -44,71 +41,182 @@ async_writer::async_writer(FILE* out_fh, const size_t max_bytes_queued_) :
async_writer::~async_writer()
{
if(thread_active)
finalize();
close();
assert(!thread_active);
}
void async_writer::write(const void* buf, size_t count, void (*free_func)(void*))
int async_writer::write(const void* buf, size_t count)
{
assert(thread_active);
cmd_block_t write_cmd;
write_cmd.write_block.cmd = CMD_WRITE;
write_cmd.write_block.count = count;
write_cmd.write_block.buf = buf;
write_cmd.write_block.free_func = free_func;
int my_error_code = 0;
timer_write_master_wait.start(__LINE__);
pthread_mutex_lock(&bytes_lock);
while(bytes_queued >= max_bytes_queued)
pthread_cond_wait(&bytes_wait, &bytes_lock);
pthread_mutex_lock(&queue_lock);
timer_write_master_wait.stop(__LINE__);
const unsigned char *p = static_cast<const unsigned char *>(buf);
while(count > 0) {
timer_write_master_wait.start(__LINE__);
pthread_mutex_lock(&ring_lock);
while(height >= ring.size() && error_code == 0)
pthread_cond_wait(&ring_wait, &ring_lock);
timer_write_master_wait.stop(__LINE__);
size_t chunk_count = std::min(count, ring.size() - height);
size_t chunk_start = floor + height;
if(chunk_start >= ring.size())
chunk_start -= ring.size();
// assert() that even wrapping around we are not writing in between floor
// and floor+height
assert(chunk_start + chunk_count <= floor + ring.size());
my_error_code = error_code;
bytes_queued += count;
cmd_queue.push(write_cmd);
pthread_cond_signal(&ring_wait);
pthread_mutex_unlock(&ring_lock);
pthread_cond_signal(&queue_wait);
pthread_mutex_unlock(&queue_lock);
pthread_mutex_unlock(&bytes_lock);
if(my_error_code)
break;
if(chunk_start + chunk_count > ring.size()) { // wraps around
memcpy(&ring[chunk_start], p, ring.size() - chunk_start);
p += ring.size() - chunk_start;
memcpy(&ring[0], p, chunk_start + chunk_count - ring.size());
p += chunk_start + chunk_count - ring.size();
} else { // add in one go
memcpy(&ring[chunk_start], p, chunk_count);
p += chunk_count;
}
count -= chunk_count;
timer_write_master_wait.start(__LINE__);
pthread_mutex_lock(&ring_lock);
timer_write_master_wait.stop(__LINE__);
height += chunk_count;
pthread_cond_signal(&ring_wait);
pthread_mutex_unlock(&ring_lock);
}
assert(my_error_code == 0 || count == 0);
return my_error_code;
}
void async_writer::seek(long offset)
int async_writer::copy(const int in_fd, size_t count)
{
assert(thread_active);
cmd_block_t seek_cmd;
seek_cmd.seek_block.cmd = CMD_SEEK;
seek_cmd.seek_block.offset = offset;
int my_error_code = 0;
timer_write_master_wait.start(__LINE__);
pthread_mutex_lock(&queue_lock);
timer_write_master_wait.stop(__LINE__);
while(count > 0 && my_error_code == 0) {
timer_write_master_wait.start(__LINE__);
pthread_mutex_lock(&ring_lock);
while(height >= ring.size() && error_code == 0)
pthread_cond_wait(&ring_wait, &ring_lock);
timer_write_master_wait.stop(__LINE__);
cmd_queue.push(seek_cmd);
size_t chunk_count = std::min(count, ring.size() - height);
size_t chunk_start = floor + height;
if(chunk_start >= ring.size())
chunk_start -= ring.size();
// assert() that even wrapping around we are not writing in between floor
// and floor+height
assert(chunk_start + chunk_count <= floor + ring.size());
my_error_code = error_code;
pthread_cond_signal(&queue_wait);
pthread_mutex_unlock(&queue_lock);
pthread_cond_signal(&ring_wait);
pthread_mutex_unlock(&ring_lock);
// don't do anything anymore an error has been reported by the writer
if(my_error_code == 0) {
assert(IOV_MAX >= 2);
struct iovec iovecs[2];
int nvec;
if(chunk_start + chunk_count > ring.size()) { // wraps around
nvec = 2;
iovecs[0].iov_base = &ring[chunk_start];
iovecs[0].iov_len = ring.size() - chunk_start;
iovecs[1].iov_base = &ring[0];
iovecs[1].iov_len = chunk_start + chunk_count - ring.size();
} else { // can write in single chunk
nvec = 1;
iovecs[0].iov_base = &ring[chunk_start];
iovecs[0].iov_len = chunk_count;
}
ssize_t bytes_read = readv(in_fd, iovecs, nvec);
timer_write_master_wait.start(__LINE__);
pthread_mutex_lock(&ring_lock);
timer_write_master_wait.stop(__LINE__);
if(bytes_read != -1) {
assert((size_t)bytes_read == chunk_count);
height += bytes_read;
} else {
my_error_code = error_code = errno;
}
pthread_cond_signal(&ring_wait);
pthread_mutex_unlock(&ring_lock);
count -= bytes_read;
}
}
assert(my_error_code != 0 || count == 0);
return my_error_code;
}
void async_writer::finalize()
int async_writer::seek(const off_t offset)
{
if(!thread_active)
return;
assert(thread_active);
int my_error_code = 0;
// this is rather slow as I wait for the buffer to empty
// wait for buffer to fully drain
timer_write_master_wait.start(__LINE__);
pthread_mutex_lock(&ring_lock);
while(height > 0 && error_code == 0)
pthread_cond_wait(&ring_wait, &ring_lock);
timer_write_master_wait.stop(__LINE__);
my_error_code = error_code;
pthread_mutex_unlock(&ring_lock);
// make writer thread exit
cmd_block_t exit_cmd;
exit_cmd.exit_block.cmd = CMD_EXIT;
if(my_error_code == 0) {
off_t ierr = lseek(out_fd, offset, SEEK_SET);
if(ierr == -1) {
timer_write_master_wait.start(__LINE__);
pthread_mutex_lock(&ring_lock);
timer_write_master_wait.stop(__LINE__);
pthread_mutex_lock(&queue_lock);
my_error_code = error_code = errno;
cmd_queue.push(exit_cmd);
pthread_cond_signal(&ring_wait);
pthread_mutex_unlock(&ring_lock);
}
}
pthread_cond_signal(&queue_wait);
pthread_mutex_unlock(&queue_lock);
return my_error_code;
}
// this waits for writer thread to finish
int async_writer::close()
{
if(!thread_active)
return 0;
// wait for buffer to fully drain
timer_write_master_wait.start(__LINE__);
pthread_mutex_lock(&ring_lock);
while(height > 0 && error_code == 0)
pthread_cond_wait(&ring_wait, &ring_lock);
timer_write_master_wait.stop(__LINE__);
do_exit = true;
pthread_cond_signal(&ring_wait);
pthread_mutex_unlock(&ring_lock);
// kill writer thread exit
int join_ierr = pthread_join(writer_thread, NULL);
if(join_ierr != 0) {
fprintf(stderr, "Failed to join with writer thread: %d\n", join_ierr);
......@@ -116,22 +224,11 @@ void async_writer::finalize()
}
thread_active = false;
}
size_t async_writer::select(const size_t bytes_requested)
{
assert(bytes_requested <= max_bytes_queued);
pthread_mutex_lock(&bytes_lock);
while(max_bytes_queued - bytes_queued < bytes_requested)
pthread_cond_wait(&bytes_wait, &bytes_lock);
const size_t bytes_available = max_bytes_queued - bytes_queued;
pthread_mutex_unlock(&bytes_lock);
return bytes_available;
// no other thread anymore, no need for the locks
return error_code;
}
void* async_writer::writer_func(void* calldata)
{
async_writer* obj = static_cast<async_writer*>(calldata);
......@@ -141,51 +238,60 @@ void* async_writer::writer_func(void* calldata)
void async_writer::writer()
{
bool done = false;
while(!done) {
pthread_mutex_lock(&queue_lock);
while(cmd_queue.empty())
pthread_cond_wait(&queue_wait, &queue_lock);
cmd_block_t cmd_block = cmd_queue.front();
cmd_queue.pop();
pthread_mutex_unlock(&queue_lock);
switch(cmd_block.cmd_hdr.cmd) {
case CMD_WRITE: {
size_t written =
fwrite(cmd_block.write_block.buf, 1, cmd_block.write_block.count, fh);
if(written != cmd_block.write_block.count) {
fprintf(stderr, "Could not write %zd bytes: %s\n",
cmd_block.write_block.count, strerror(errno));
exit(1);
}
if(cmd_block.write_block.free_func)
cmd_block.write_block.free_func(const_cast<void*>(cmd_block.write_block.buf));
pthread_mutex_lock(&bytes_lock);
bytes_queued -= written;
pthread_cond_signal(&bytes_wait);
pthread_mutex_unlock(&bytes_lock);
} break;
case CMD_SEEK: {
int seeked = fseek(fh, cmd_block.seek_block.offset, SEEK_SET);
if(seeked != 0) {
fprintf(stderr, "Could not seek to position %ld: %s\n", cmd_block.seek_block.offset, strerror(errno));
exit(1);
}
} break;
case CMD_EXIT: {
// not really required but keeps writes here rather than in caller
int flushed = fflush(fh);
if(flushed != 0) {
fprintf(stderr, "Could not flush: %s\n", strerror(errno));
exit(1);
}
done = true;
} break;
default:
fprintf(stderr, "Invalid command %d\n", cmd_block.cmd_hdr.cmd);
exit(1);
break;
}
}
int my_error_code = 0;
while(my_error_code == 0) {
timer_write_worker_wait.start(__LINE__);
pthread_mutex_lock(&ring_lock);
while(height == 0 && error_code == 0 && !do_exit)
pthread_cond_wait(&ring_wait, &ring_lock);
timer_write_worker_wait.stop(__LINE__);
size_t chunk_count = height;
size_t chunk_start = floor;
my_error_code = error_code;
pthread_mutex_unlock(&ring_lock);
if(do_exit)
break;
// this ensures we are not doing anyting anymore if a reader finds an error
// and also do not overwrite the readers error_code
if(my_error_code == 0) {
assert(IOV_MAX >= 2);
struct iovec iovecs[2];
int nvec;
if(chunk_start + chunk_count > ring.size()) { // wraps around
nvec = 2;
iovecs[0].iov_base = &ring[chunk_start];
iovecs[0].iov_len = ring.size() - chunk_start;
iovecs[1].iov_base = &ring[0];
iovecs[1].iov_len = chunk_start + chunk_count - ring.size();
} else { // can write in single chunk
nvec = 1;
iovecs[0].iov_base = &ring[chunk_start];
iovecs[0].iov_len = chunk_count;
}
ssize_t bytes_written = writev(out_fd, iovecs, nvec);
timer_write_worker_wait.start(__LINE__);
pthread_mutex_lock(&ring_lock);
timer_write_worker_wait.stop(__LINE__);
if(bytes_written == -1) {
my_error_code = errno;
} else {
floor += bytes_written;
if(floor >= ring.size())
floor -= ring.size();
height -= bytes_written;
}
error_code = my_error_code;
pthread_cond_signal(&ring_wait);
pthread_mutex_unlock(&ring_lock);
} // if my_error_code
} // while my_error_code
}
......@@ -5,7 +5,8 @@
#include <cstdio>
#include <cstdlib>
#include <pthread.h>
#include <queue>
#include <vector>
// a class to provide asynchronous IO to a file
// it is the caller's responsibility to fopen() the file, fclose() it, and to
......@@ -13,7 +14,7 @@
class async_writer
{
public:
async_writer(FILE* out_fh, size_t max_bytes_queued_ = 1000000000);
async_writer(const int out_fd, const size_t capacity_);
~async_writer();
private:
// no copying or assignment
......@@ -21,59 +22,42 @@ class async_writer
async_writer& operator=(const async_writer& other);
public:
// write buf to file, call free_func() on block of memory once done
// it is NULL)
void write(const void* buf, size_t count, void (*free_func)(void*) = free);
// all function can return an error code generated by previous async calls
// once an error occured, all later calls will immediately return with an
// error
// write count bytes from buf to file
int write(const void* buf, size_t count);
// copy count bytes from buf from in_fd to out_fd
int copy(const int in_fd, size_t count);
// seek to location offset in the file
void seek(long offset);
int seek(const off_t offset);
// flush command queue and wait for writer to finish
void finalize();
// waits until there are at least bytes_available bytes to add to the queue,
// returns how many bytes are actually available to be written without
// blocking. It is an error to ask for more bytes than max_bytes_queued.
size_t select(const size_t bytes_available);
int close();
private:
// data types to keep track of what the writer end needs to do
enum cmd_name {CMD_INVALID = 0, CMD_WRITE, CMD_SEEK, CMD_EXIT, CMD_END};
union cmd_block_t {
struct cmd_hdr_t {
cmd_name cmd;
} cmd_hdr;
struct write_block_t {
cmd_name cmd;
const void *buf;
size_t count;
void (*free_func)(void*);
} write_block;
struct seek_block_t {
cmd_name cmd;
long offset;
} seek_block;
struct exit_block_t {
cmd_name cmd;
} exit_block;
};
typedef std::queue<cmd_block_t> cmd_queue_t;
cmd_queue_t cmd_queue;
size_t bytes_queued;
// the output file
FILE* fh;
// write() blocks until there are no more than max_bytes_queued bytes queued up
const size_t max_bytes_queued;
int out_fd;
// the last error encountered
int error_code;
// the ring buffer holding data to be written
// needs to be typed so that I can do pointer arithmetic on it
std::vector<unsigned char> ring; // storage for the ring buffer
size_t floor; // unwritten data starts here
size_t height; // amount of unwritten data
// TODO: add seek queue
// threading support
pthread_t writer_thread;
// if holding both mutexes then bytes_lock must be aquired first!
// protects access to bytes_queued variable and signals when it is reduced
pthread_mutex_t bytes_lock;
pthread_cond_t bytes_wait;
// protects access to queue and signals when a new work item is added
pthread_mutex_t queue_lock;
pthread_cond_t queue_wait;
pthread_mutex_t ring_lock;
pthread_cond_t ring_wait;
bool thread_active;
bool do_exit; // request exist of the worker
// these two functions implement the writer thread. The static one is
// passed to pthread_create and wraps the member function
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment