Commit 54243375 authored by Roland Haas's avatar Roland Haas
Browse files

mpitar: add seek queue

parent 896cee63
......@@ -14,13 +14,15 @@
#include <unistd.h>
#include <limits.h>
#define DIM(v) (sizeof(v)/sizeof(v[0]))
timer timer_write_worker_wait("write_worker_wait");
timer timer_write_worker_write("write_worker_write");
timer timer_write_master_wait("write_master_wait");
async_writer::async_writer(const int out_fd_, const size_t capacity_) :
out_fd(out_fd_), error_code(0), ring(capacity_), floor(0), height(0),
thread_active(false), do_exit(false)
active_seeks(0), thread_active(false), do_exit(false)
{
int ring_mutex_init = pthread_mutex_init(&ring_lock, NULL);
int ring_cond_init = pthread_cond_init(&ring_wait, NULL);
......@@ -142,16 +144,18 @@ int async_writer::copy(const int in_fd, size_t count)
}
ssize_t bytes_read = readv(in_fd, iovecs, nvec);
if(bytes_read != -1)
my_error_code = errno;
timer_write_master_wait.start(__LINE__);
pthread_mutex_lock(&ring_lock);
timer_write_master_wait.stop(__LINE__);
if(bytes_read != -1) {
if(my_error_code == 0) {
assert((size_t)bytes_read == chunk_count);
height += bytes_read;
} else {
my_error_code = error_code = errno;
} else if(error_code == 0) {
error_code = my_error_code;
}
pthread_cond_signal(&ring_wait);
......@@ -176,28 +180,22 @@ int async_writer::seek(const off_t offset)
// wait for buffer to fully drain
timer_write_master_wait.start(__LINE__);
pthread_mutex_lock(&ring_lock);
while(height > 0 && error_code == 0)
while(active_seeks >= DIM(seeks) && error_code == 0)
pthread_cond_wait(&ring_wait, &ring_lock);
timer_write_master_wait.stop(__LINE__);
my_error_code = error_code;
pthread_mutex_unlock(&ring_lock);
if(my_error_code == 0) {
off_t ierr = lseek(out_fd, offset, SEEK_SET);
if(ierr == -1) {
timer_write_master_wait.start(__LINE__);
pthread_mutex_lock(&ring_lock);
timer_write_master_wait.stop(__LINE__);
my_error_code = error_code = errno;
pthread_cond_signal(&ring_wait);
pthread_mutex_unlock(&ring_lock);
}
assert(active_seeks == 0 ||
seeks[active_seeks-1].trigger <= floor + height);
seeks[active_seeks].trigger = floor + height;
seeks[active_seeks].offset = offset;
active_seeks += 1;
}
pthread_mutex_unlock(&ring_lock);
return my_error_code;
}
......@@ -249,49 +247,90 @@ void async_writer::writer()
size_t chunk_count = height;
size_t chunk_start = floor;
size_t seek_trigger = seeks[0].trigger;
off_t seek_offset = seeks[0].offset;
bool have_seek = active_seeks > 0;
my_error_code = error_code;
pthread_mutex_unlock(&ring_lock);
assert(!have_seek || seek_trigger >= chunk_start);
if(do_exit)
break;
// this ensures we are not doing anyting anymore if a reader finds an error
// and also do not overwrite the readers error_code
if(my_error_code == 0) {
assert(IOV_MAX >= 2);
struct iovec iovecs[2];
int nvec;
if(chunk_start + chunk_count > ring.size()) { // wraps around
nvec = 2;
iovecs[0].iov_base = &ring[chunk_start];
iovecs[0].iov_len = ring.size() - chunk_start;
iovecs[1].iov_base = &ring[0];
iovecs[1].iov_len = chunk_start + chunk_count - ring.size();
} else { // can write in single chunk
nvec = 1;
iovecs[0].iov_base = &ring[chunk_start];
iovecs[0].iov_len = chunk_count;
// Seek if we have hit the trigger. We will always exactly hit the
// trigger.
if(have_seek && chunk_start == seek_trigger) {
off_t pos = lseek(out_fd, seek_offset, SEEK_SET);
if(pos == -1)
my_error_code = errno;
have_seek = false;
timer_write_worker_wait.start(__LINE__);
pthread_mutex_lock(&ring_lock);
timer_write_worker_wait.stop(__LINE__);
if(my_error_code == 0) {
active_seeks -= 1;
memmove(&seeks[1], &seeks[0], active_seeks * sizeof(seeks[0]));
} else if(error_code == 0) {
error_code = my_error_code;
}
pthread_cond_signal(&ring_wait);
pthread_mutex_unlock(&ring_lock);
}
ssize_t bytes_written = writev(out_fd, iovecs, nvec);
timer_write_worker_wait.start(__LINE__);
pthread_mutex_lock(&ring_lock);
timer_write_worker_wait.stop(__LINE__);
if(bytes_written == -1) {
my_error_code = errno;
} else {
floor += bytes_written;
if(floor >= ring.size())
floor -= ring.size();
height -= bytes_written;
}
error_code = my_error_code;
pthread_cond_signal(&ring_wait);
pthread_mutex_unlock(&ring_lock);
if(my_error_code == 0) {
// reduce write to only write up to seek trigger
if(have_seek && chunk_start + chunk_count > seek_trigger)
chunk_count = seek_trigger - chunk_start;
assert(IOV_MAX >= 2);
struct iovec iovecs[2];
int nvec;
if(chunk_start + chunk_count > ring.size()) { // wraps around
nvec = 2;
iovecs[0].iov_base = &ring[chunk_start];
iovecs[0].iov_len = ring.size() - chunk_start;
iovecs[1].iov_base = &ring[0];
iovecs[1].iov_len = chunk_start + chunk_count - ring.size();
} else { // can write in single chunk
nvec = 1;
iovecs[0].iov_base = &ring[chunk_start];
iovecs[0].iov_len = chunk_count;
}
ssize_t bytes_written = writev(out_fd, iovecs, nvec);
if(bytes_written == -1)
my_error_code = errno;
timer_write_worker_wait.start(__LINE__);
pthread_mutex_lock(&ring_lock);
timer_write_worker_wait.stop(__LINE__);
if(my_error_code == 0) {
floor += bytes_written;
if(floor >= ring.size()) { // wrapround happened
// need to update all triggers which were above the wraparound point
for(size_t i = 0 ; i < active_seeks ; i++) {
assert(seeks[i].trigger > floor);
seeks[i].trigger -= ring.size();
}
floor -= ring.size();
}
height -= bytes_written;
} else if(error_code == 0) {
error_code = my_error_code;
}
pthread_cond_signal(&ring_wait);
pthread_mutex_unlock(&ring_lock);
} // if my_error_code
} // if my_error_code
} // while my_error_code
}
......@@ -47,7 +47,14 @@ class async_writer
size_t floor; // unwritten data starts here
size_t height; // amount of unwritten data
// TODO: add seek queue
// a queue of seek positions too seek to whenever the ring-buffer is
// drained to the correct amount. This could be done as another ring-buffer
// but seems hardly worth it.
struct {
size_t trigger; // seek whenever floor reaches this level
off_t offset; // where to seek to
} seeks[10];
size_t active_seeks; // number of queued seeks
// threading support
pthread_t writer_thread;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment