Commit c56ad9f5 authored by Roland Haas's avatar Roland Haas
Browse files

mpitar: handle short readv and writev results

fix bug concerning retval of readv (was exactly the wrong condition
check)
parent 081b45b3
......@@ -126,6 +126,7 @@ int async_writer::write(const void* buf, size_t count)
timer_write_master_wait.stop(__LINE__);
height += chunk_count;
assert(height <= ring.size());
pthread_cond_signal(&ring_wait);
pthread_mutex_unlock(&ring_lock);
......@@ -177,17 +178,38 @@ int async_writer::copy(const int in_fd, size_t count)
iovecs[0].iov_len = chunk_count;
}
ssize_t bytes_read = readv(in_fd, iovecs, nvec);
if(bytes_read != -1)
my_error_code = errno;
ssize_t bytes_to_read = chunk_count;
while(bytes_to_read > 0) {
ssize_t bytes_read = readv(in_fd, iovecs, nvec);
if(bytes_read == -1) {
my_error_code = errno;
break;
}
bytes_to_read -= bytes_read;
assert(bytes_to_read >= 0);
// update chunks to read
for(int i = 0 ; i < nvec ; i++) {
if((size_t)bytes_read >= iovecs[i].iov_len) {
bytes_read -= iovecs[i].iov_len;
iovecs[i].iov_len = 0;
} else {
iovecs[i].iov_base =
static_cast<unsigned char *>(iovecs[i].iov_base) + bytes_read;
iovecs[i].iov_len -= bytes_read;
break;
}
}
assert(bytes_read == 0);
}
timer_write_master_wait.start(__LINE__);
pthread_mutex_lock(&ring_lock);
timer_write_master_wait.stop(__LINE__);
if(my_error_code == 0) {
assert((size_t)bytes_read == chunk_count);
height += bytes_read;
assert(bytes_to_read == 0);
height += chunk_count;
assert(height <= ring.size());
} else if(error_code == 0) {
error_code = my_error_code;
}
......@@ -195,7 +217,7 @@ int async_writer::copy(const int in_fd, size_t count)
pthread_cond_signal(&ring_wait);
pthread_mutex_unlock(&ring_lock);
count -= bytes_read;
count -= chunk_count;
}
}
......@@ -209,9 +231,7 @@ int async_writer::seek(const off_t offset)
int my_error_code = 0;
// this is rather slow as I wait for the buffer to empty
// wait for buffer to fully drain
// wait for there being room in the seeks queue
timer_write_master_wait.start(__LINE__);
pthread_mutex_lock(&ring_lock);
while(active_seeks >= DIM(seeks) && error_code == 0)
......@@ -314,11 +334,6 @@ void async_writer::writer()
if(my_error_code == 0) {
active_seeks -= 1;
memmove(&seeks[1], &seeks[0], active_seeks * sizeof(seeks[0]));
// used up the current trigger, so update values
seek_trigger = seeks[0].trigger;
seek_offset = seeks[0].offset;
have_seek = active_seeks > 0;
} else if(error_code == 0) {
error_code = my_error_code;
}
......@@ -346,26 +361,48 @@ void async_writer::writer()
iovecs[0].iov_len = chunk_count;
}
ssize_t bytes_written = writev(out_fd, iovecs, nvec);
if(bytes_written == -1)
my_error_code = errno;
ssize_t bytes_to_write = chunk_count;
while(bytes_to_write > 0) {
ssize_t bytes_written = writev(out_fd, iovecs, nvec);
if(bytes_written == -1) {
my_error_code = errno;
break;
}
bytes_to_write -= bytes_written;
assert(bytes_to_write >= 0);
// update chunks to write
for(int i = 0 ; i < nvec ; i++) {
if((size_t)bytes_written >= iovecs[i].iov_len) {
bytes_written -= iovecs[i].iov_len;
iovecs[i].iov_len = 0;
} else {
iovecs[i].iov_base = (char*)iovecs[i].iov_base + bytes_written;
iovecs[i].iov_len -= bytes_written;
break;
}
}
assert(bytes_written == 0);
}
timer_write_worker_wait.start(__LINE__);
pthread_mutex_lock(&ring_lock);
timer_write_worker_wait.stop(__LINE__);
if(my_error_code == 0) {
floor += bytes_written;
assert(bytes_to_write == 0);
floor += chunk_count;
if(floor >= ring.size()) { // wrapround happened
// need to update all triggers which were above the wraparound point
for(size_t i = 0 ; i < active_seeks ; i++) {
assert(seeks[i].trigger >= floor);
seeks[i].trigger -= ring.size();
assert(seeks[i].trigger < ring.size());
}
floor -= ring.size();
}
height -= bytes_written;
assert(chunk_count <= height);
height -= chunk_count;
}
pthread_cond_signal(&ring_wait);
pthread_mutex_unlock(&ring_lock);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment