Commit 081b45b3 authored by Roland Haas's avatar Roland Haas
Browse files

mpitar: simplify and correct seek / write interaction

parent 5da8a7f1
......@@ -226,6 +226,11 @@ int async_writer::seek(const off_t offset)
seeks[active_seeks].trigger = floor + height;
seeks[active_seeks].offset = offset;
active_seeks += 1;
// check sanity of seeks
for(size_t i = 1 ; i < active_seeks ; i++) {
assert(seeks[i-1].trigger <= seeks[i].trigger);
}
}
pthread_mutex_unlock(&ring_lock);
......@@ -284,87 +289,100 @@ void async_writer::writer()
size_t seek_trigger = seeks[0].trigger;
off_t seek_offset = seeks[0].offset;
bool have_seek = active_seeks > 0;
bool my_do_exit = do_exit;
my_error_code = error_code;
pthread_mutex_unlock(&ring_lock);
assert(!have_seek || seek_trigger >= chunk_start);
if(do_exit)
if(my_do_exit || my_error_code)
break;
// this ensures we are not doing anyting anymore if a reader finds an error
// and also do not overwrite the readers error_code
if(my_error_code == 0) {
if(have_seek && chunk_start == seek_trigger) {
// Seek if we have hit the trigger. We will always exactly hit the
// trigger.
if(have_seek && chunk_start == seek_trigger) {
off_t pos = lseek(out_fd, seek_offset, SEEK_SET);
if(pos == -1)
my_error_code = errno;
have_seek = false;
timer_write_worker_wait.start(__LINE__);
pthread_mutex_lock(&ring_lock);
timer_write_worker_wait.stop(__LINE__);
if(my_error_code == 0) {
active_seeks -= 1;
memmove(&seeks[1], &seeks[0], active_seeks * sizeof(seeks[0]));
} else if(error_code == 0) {
error_code = my_error_code;
}
off_t pos = lseek(out_fd, seek_offset, SEEK_SET);
if(pos == -1)
my_error_code = errno;
have_seek = false;
timer_write_worker_wait.start(__LINE__);
pthread_mutex_lock(&ring_lock);
timer_write_worker_wait.stop(__LINE__);
pthread_cond_signal(&ring_wait);
pthread_mutex_unlock(&ring_lock);
}
if(my_error_code == 0) {
// reduce write to only write up to seek trigger
if(have_seek && chunk_start + chunk_count > seek_trigger)
chunk_count = seek_trigger - chunk_start;
assert(IOV_MAX >= 2);
struct iovec iovecs[2];
int nvec;
if(chunk_start + chunk_count > ring.size()) { // wraps around
nvec = 2;
iovecs[0].iov_base = &ring[chunk_start];
iovecs[0].iov_len = ring.size() - chunk_start;
iovecs[1].iov_base = &ring[0];
iovecs[1].iov_len = chunk_start + chunk_count - ring.size();
} else { // can write in single chunk
nvec = 1;
iovecs[0].iov_base = &ring[chunk_start];
iovecs[0].iov_len = chunk_count;
}
active_seeks -= 1;
memmove(&seeks[1], &seeks[0], active_seeks * sizeof(seeks[0]));
ssize_t bytes_written = writev(out_fd, iovecs, nvec);
if(bytes_written == -1)
my_error_code = errno;
// used up the current trigger, so update values
seek_trigger = seeks[0].trigger;
seek_offset = seeks[0].offset;
have_seek = active_seeks > 0;
} else if(error_code == 0) {
error_code = my_error_code;
}
timer_write_worker_wait.start(__LINE__);
pthread_mutex_lock(&ring_lock);
timer_write_worker_wait.stop(__LINE__);
pthread_cond_signal(&ring_wait);
pthread_mutex_unlock(&ring_lock);
} else {
// no trigger, write some data
// reduce write to only write up to seek trigger
if(have_seek && chunk_start + chunk_count > seek_trigger)
chunk_count = seek_trigger - chunk_start;
if(my_error_code == 0) {
floor += bytes_written;
if(floor >= ring.size()) { // wrapround happened
// need to update all triggers which were above the wraparound point
for(size_t i = 0 ; i < active_seeks ; i++) {
assert(seeks[i].trigger > floor);
seeks[i].trigger -= ring.size();
}
assert(IOV_MAX >= 2);
struct iovec iovecs[2];
int nvec;
if(chunk_start + chunk_count > ring.size()) { // wraps around
nvec = 2;
iovecs[0].iov_base = &ring[chunk_start];
iovecs[0].iov_len = ring.size() - chunk_start;
iovecs[1].iov_base = &ring[0];
iovecs[1].iov_len = chunk_start + chunk_count - ring.size();
} else { // can write in single chunk
nvec = 1;
iovecs[0].iov_base = &ring[chunk_start];
iovecs[0].iov_len = chunk_count;
}
ssize_t bytes_written = writev(out_fd, iovecs, nvec);
if(bytes_written == -1)
my_error_code = errno;
floor -= ring.size();
timer_write_worker_wait.start(__LINE__);
pthread_mutex_lock(&ring_lock);
timer_write_worker_wait.stop(__LINE__);
if(my_error_code == 0) {
floor += bytes_written;
if(floor >= ring.size()) { // wrapround happened
// need to update all triggers which were above the wraparound point
for(size_t i = 0 ; i < active_seeks ; i++) {
assert(seeks[i].trigger >= floor);
seeks[i].trigger -= ring.size();
}
height -= bytes_written;
} else if(error_code == 0) {
error_code = my_error_code;
}
pthread_cond_signal(&ring_wait);
pthread_mutex_unlock(&ring_lock);
} // if my_error_code
} // if my_error_code
floor -= ring.size();
}
height -= bytes_written;
}
pthread_cond_signal(&ring_wait);
pthread_mutex_unlock(&ring_lock);
} // if chunk_start == seek_trigger
} // while my_error_code
// this publishes any error that I encountered but only if no one else
// already encountered an error
if(my_error_code != 0) {
timer_write_worker_wait.start(__LINE__);
pthread_mutex_lock(&ring_lock);
timer_write_worker_wait.stop(__LINE__);
if(error_code == 0) {
error_code = my_error_code;
pthread_cond_signal(&ring_wait);
}
pthread_mutex_unlock(&ring_lock);
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment