Commit f431f2e7 authored by Roland Haas's avatar Roland Haas
Browse files

mpitar: kludge in asynchronous IO support

not very efficient or pretty (too many malloc/free) but enough to test
for now.

Possible improvements:
* use less malloc/free
* use ring buffer (or bipbuffer)
* use POSIX asyncio and get rid of thread (combine with eihter one of
the above)
parent c14c052b
......@@ -8,8 +8,8 @@ export XTPE_LINK_TYPE=dynamic
all: mpitar test
mpitar: mpitar.cc timer.hh tarentry.cc tarentry.hh fileentry.hh cmdline.cc cmdline.hh
$(CC) $(LDFLAGS) -Wall -g3 -Og mpitar.cc tarentry.cc cmdline.cc -o mpitar
mpitar: mpitar.cc timer.hh tarentry.cc tarentry.hh fileentry.hh cmdline.cc cmdline.hh async_writer.cc async_writer.hh
$(CC) $(LDFLAGS) -Wall -g3 -Og mpitar.cc tarentry.cc cmdline.cc async_writer.cc -o mpitar
test: mpitar test.sh
@rm -rf test
......
......@@ -66,6 +66,7 @@
#include<string>
//#define DO_TIMING
#include "async_writer.hh"
#include "cmdline.hh"
#include "fileentry.hh"
#include "timer.hh"
......@@ -86,7 +87,7 @@ timer timer_worker_wait("worker_wait"), timer_master_wait("master_wait");
#define DIM(v) (sizeof(v)/sizeof(v[0]))
static void copy_file_content(FILE *out_fh, const char *out_fn,
static void copy_file_content(async_writer& out_writer, const char *out_fn,
const tarentry &ent);
static int find_unused_request(int count, MPI_Request *request);
......@@ -170,6 +171,7 @@ void master(const char *out_fn, fileentries& entries)
// and truncated the file
// I cannot use r+ since this seems to make fseek in in data into the buffer
MPI_Barrier(MPI_COMM_WORLD);
async_writer out_writer(out_fh);
char idx_fn[1024];
size_t idx_fn_size = snprintf(idx_fn, sizeof(idx_fn), "%s.idx", out_fn);
......@@ -313,26 +315,17 @@ void master(const char *out_fn, fileentries& entries)
timer_stat.start(__LINE__);
tarentry idx_ent(idx_fn, off);
timer_stat.stop(__LINE__);
copy_file_content(out_fh, out_fn, idx_ent);
copy_file_content(out_writer, out_fn, idx_ent);
off += idx_ent.size();
/* terminate tar file */
static char buffer[2*BLOCKSIZE];
off_t ierr_seek = fseek(out_fh, (long)off, SEEK_SET);
if(ierr_seek == -1) {
fprintf(stderr, "Could not seek '%s' to %zu: %s\n", out_fn,
size_t(off), strerror(errno));
exit(1);
}
out_writer.seek(off);
timer_write.start(__LINE__);
size_t written = fwrite(buffer, 1, 2*BLOCKSIZE, out_fh);
out_writer.write(buffer, 2*BLOCKSIZE, NULL);
timer_write.stop(__LINE__);
if(written != 2*BLOCKSIZE) {
fprintf(stderr, "Could not write %zu bytes to '%s': %s\n",
size_t(2*BLOCKSIZE), out_fn, strerror(errno));
exit(1);
}
timer_write.start(__LINE__);
out_writer.finalize();
int ierr_close = fclose(out_fh);
timer_write.stop(__LINE__);
if(ierr_close != 0) {
......@@ -383,6 +376,7 @@ void worker(const char *out_fn)
timer_open.stop(__LINE__);
static char buffer[STREAM_BUFFER_SIZE];
setbuffer(out_fh, buffer, sizeof(buffer));
async_writer out_writer(out_fh);
int done = 0;
do {
......@@ -426,7 +420,7 @@ void worker(const char *out_fn)
assert(sizeof(size_t) <= sizeof(unsigned long long int));
static unsigned long long int chunk_written = 0;
const filedesc_t& file = files.front();
copy_file_content(out_fh, out_fn, file.ent);
copy_file_content(out_writer, out_fn, file.ent);
file_count += 1;
chunk_written += static_cast<unsigned long long int>(file.ent.size());
if(file.ask_for_work) {
......@@ -442,6 +436,7 @@ void worker(const char *out_fn)
/* this will usually induce a delay while caches are flushed */
timer_write.start(__LINE__);
out_writer.finalize();
int ierr_close = fclose(out_fh);
timer_write.stop(__LINE__);
if(ierr_close != 0) {
......@@ -475,7 +470,7 @@ static int find_unused_request(int count, MPI_Request *request)
return flag ? idx : -1;
}
static void copy_file_content(FILE *out_fh, const char *out_fn,
static void copy_file_content(async_writer& out_writer, const char *out_fn,
const tarentry &ent)
{
const size_t off = ent.get_offset();
......@@ -488,24 +483,17 @@ static void copy_file_content(FILE *out_fh, const char *out_fn,
static size_t file_off = 0;
timer_seek.start(__LINE__);
if(file_off != off) {
int ierr_seek = fseek(out_fh, (long)off, SEEK_SET);
if(ierr_seek == -1) {
fprintf(stderr, "Could not seek '%s' to %zu: %s\n", out_fn,
size_t(off), strerror(errno));
exit(1);
}
out_writer.seek(off);
file_off = off;
}
timer_seek.stop(__LINE__);
timer_write.start(__LINE__);
size_t written = fwrite(hdr.data(), 1, hdr.size(), out_fh);
void *data = malloc(hdr.size());
assert(data);
memcpy(data, static_cast<const void*>(hdr.data()), hdr.size());
out_writer.write(data, hdr.size());
timer_write.stop(__LINE__);
if(written != hdr.size()) {
fprintf(stderr, "Could not write %zu bytes to '%s': %s\n", hdr.size(),
out_fn, strerror(errno));
exit(1);
}
file_off += hdr.size();
if(!ent.is_reg())
return;
......@@ -521,9 +509,11 @@ static void copy_file_content(FILE *out_fh, const char *out_fn,
off_t size = (off_t)ent.get_filesize();
off_t offset = 0;
while(offset < size) {
static char fbuf[COPY_BLOCK_SIZE]; /* TODO: find an optimal number */
size_t blocksize = size_t(size-offset) > COPY_BLOCK_SIZE ? COPY_BLOCK_SIZE : (size-offset);
void *fbuf = malloc(blocksize);
assert(fbuf);
timer_read.start(__LINE__);
ssize_t read_sz = read(in_fd, fbuf, size_t(size-offset) > sizeof(fbuf) ? sizeof(fbuf) : (size-offset));
ssize_t read_sz = read(in_fd, fbuf, blocksize);
timer_read.stop(__LINE__);
if(read_sz == -1) {
fprintf(stderr, "Could not read from '%s': %s\n", in_fn, strerror(errno));
......@@ -531,13 +521,8 @@ static void copy_file_content(FILE *out_fh, const char *out_fn,
}
offset += read_sz;
timer_write.start(__LINE__);
size_t write_sz = fwrite(fbuf, 1, read_sz, out_fh);
out_writer.write(fbuf, read_sz);
timer_write.stop(__LINE__);
if(write_sz != size_t(read_sz)) {
fprintf(stderr, "Could not write %zu bytes to '%s': %s\n",
size_t(read_sz), out_fn, strerror(errno));
exit(1);
}
}
assert(offset == size);
file_off += size;
......@@ -546,13 +531,8 @@ static void copy_file_content(FILE *out_fh, const char *out_fn,
if(size % BLOCKSIZE) {
const size_t padsize = BLOCKSIZE - (size % BLOCKSIZE);
timer_write.start(__LINE__);
const size_t ierr_write = fwrite(block, 1, padsize, out_fh);
out_writer.write(block, padsize, NULL);
timer_write.stop(__LINE__);
if(ierr_write != padsize) {
fprintf(stderr, "Could not write %zu bytes to '%s': %s\n", padsize,
out_fn, strerror(errno));
exit(1);
}
file_off += padsize;
}
timer_open.start(__LINE__);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment