Commit 5bacd355 authored by Roland Haas's avatar Roland Haas
Browse files

mpitar: adapt to ring-buffered asyncwriter

parent 54243375
......@@ -75,11 +75,10 @@
#define MAX_JOBS_IN_FLIGHT 3
#define MAX_FILES_IN_JOB 100
#define TARGET_JOB_SIZE (1024ul*1024ul*1024ul)
#define COPY_BLOCK_SIZE (512ul*1024ul*1024ul)
#define STREAM_BUFFER_SIZE (COPY_BLOCK_SIZE)
#define RING_BUFFER_SIZE (512ul*1024ul*1024ul)
// the only file that the master writes is the index file, so the buffer does
// not have to be large
#define MASTER_STREAM_BUFFER_SIZE (10ul*1024ul*1024ul)
#define MASTER_RING_BUFFER_SIZE (10ul*1024ul*1024ul)
std::vector<timer*> timer::all_timers;
......@@ -157,26 +156,20 @@ int main(int argc, char **argv)
void master(const char *out_fn, fileentries& entries)
{
timer_open.start(__LINE__);
// I use this open/fdopen combo to be able to use fread but also have a
// WRONLY which avoid fseek() pre-reading into the buffer
int out_fd = open(out_fn, O_WRONLY | O_TRUNC | O_CREAT, 0666);
if(out_fd == -1) {
fprintf(stderr, "Could not open '%s' for writing: %s\n", out_fn,
strerror(errno));
exit(1);
}
FILE *out_fh = fdopen(out_fd, "wb");
if(out_fh == NULL) {
fprintf(stderr, "Could not open '%s' for writing: %s\n", out_fn,
strerror(errno));
exit(1);
}
timer_open.stop(__LINE__);
// I need this barrier so that all ranks wait until the last one has opened
// and truncated the file
// I cannot use r+ since this seems to make fseek reads in data into the buffer
// I need this barrier so that the workers wait opening the file until the
// master has created it
MPI_Barrier(MPI_COMM_WORLD);
async_writer out_writer(out_fh, MASTER_STREAM_BUFFER_SIZE);
// this async_writer is only used to write the final index file, so does not
// need a large buffer (or need ot be async really but this way I can use
// copy_file_content)
async_writer out_writer(out_fd, MASTER_RING_BUFFER_SIZE);
char idx_fn[1024];
size_t idx_fn_size = snprintf(idx_fn, sizeof(idx_fn), "%s.idx", out_fn);
......@@ -332,14 +325,29 @@ void master(const char *out_fn, fileentries& entries)
/* terminate tar file */
static char buffer[2*BLOCKSIZE];
out_writer.seek(off);
int ierr_seek = out_writer.seek(off);
if(ierr_seek != 0) {
fprintf(stderr, "Could not seek in '%s': %s\n", out_fn,
strerror(ierr_seek));
exit(1);
}
timer_write_master_write.start(__LINE__);
out_writer.write(buffer, 2*BLOCKSIZE, NULL);
int ierr_write = out_writer.write(buffer, 2*BLOCKSIZE);
if(ierr_write != 0) {
fprintf(stderr, "Could not write to '%s': %s\n", out_fn,
strerror(ierr_write));
exit(1);
}
timer_write_master_write.stop(__LINE__);
out_writer.finalize();
int ierr_async_close = out_writer.close();
if(ierr_async_close != 0) {
fprintf(stderr, "Could not write to '%s': %s\n", out_fn,
strerror(ierr_async_close));
exit(1);
}
timer_write_master_write.start(__LINE__);
int ierr_close = fclose(out_fh);
int ierr_close = close(out_fd);
timer_write_master_write.stop(__LINE__);
if(ierr_close != 0) {
fprintf(stderr, "Could not write to '%s': %s\n", out_fn,
......@@ -369,29 +377,17 @@ void worker(const char *out_fn)
// open output file
timer_open.start(__LINE__);
// I need this barrier so that all ranks wait until the last one has opened
// and truncated the file
// I cannot use r+ since this seems to make fseek in in data into the buffer
// this barrier is after/before the open call in master/worker
// I need this barrier so that worker ranks wait until the master has created
// the file
MPI_Barrier(MPI_COMM_WORLD);
// I use this open/fdopen combo to be able to use fread but also have a
// WRONLY which avoid fseek() pre-reading into the buffer
int out_fd = open(out_fn, O_WRONLY, 0666);
if(out_fd == -1) {
fprintf(stderr, "Could not open '%s' for writing: %s\n", out_fn,
strerror(errno));
exit(1);
}
FILE *out_fh = fdopen(out_fd, "wb");
if(out_fh == NULL) {
fprintf(stderr, "Could not open '%s' for writing: %s\n", out_fn,
strerror(errno));
exit(1);
}
timer_open.stop(__LINE__);
static char buffer[STREAM_BUFFER_SIZE];
setbuffer(out_fh, buffer, sizeof(buffer));
async_writer out_writer(out_fh, STREAM_BUFFER_SIZE);
async_writer out_writer(out_fd, RING_BUFFER_SIZE);
int done = 0;
do {
......@@ -455,9 +451,14 @@ void worker(const char *out_fn)
} while(!done || !files.empty());
/* this will usually induce a delay while caches are flushed */
out_writer.finalize();
int ierr_async_close = out_writer.close();
if(ierr_async_close != 0) {
fprintf(stderr, "Could not write to '%s': %s\n", out_fn,
strerror(ierr_async_close));
exit(1);
}
timer_write_master_write.start(__LINE__);
int ierr_close = fclose(out_fh);
int ierr_close = close(out_fd);
timer_write_master_write.stop(__LINE__);
if(ierr_close != 0) {
fprintf(stderr, "Could not write to '%s': %s\n", out_fn,
......@@ -503,7 +504,12 @@ static void copy_file_content(async_writer& out_writer, const char *out_fn,
static size_t file_off = 0;
timer_seek.start(__LINE__);
if(file_off != off) {
out_writer.seek(off);
int ierr_seek = out_writer.seek(off);
if(ierr_seek != 0) {
fprintf(stderr, "Could not seek in '%s': %s\n", out_fn,
strerror(ierr_seek));
exit(1);
}
file_off = off;
}
timer_seek.stop(__LINE__);
......@@ -511,7 +517,12 @@ static void copy_file_content(async_writer& out_writer, const char *out_fn,
void *data = malloc(hdr.size());
assert(data);
memcpy(data, static_cast<const void*>(hdr.data()), hdr.size());
out_writer.write(data, hdr.size());
int ierr_write_hdr = out_writer.write(data, hdr.size());
if(ierr_write_hdr != 0) {
fprintf(stderr, "Could not write to '%s': %s\n", out_fn,
strerror(ierr_write_hdr));
exit(1);
}
file_off += hdr.size();
if(!ent.is_reg())
return;
......@@ -525,28 +536,21 @@ static void copy_file_content(async_writer& out_writer, const char *out_fn,
exit(1);
}
off_t size = (off_t)ent.get_filesize();
off_t offset = 0;
while(offset < size) {
size_t blocksize = size_t(size-offset) > COPY_BLOCK_SIZE ? COPY_BLOCK_SIZE : (size-offset);
void *fbuf = malloc(blocksize);
assert(fbuf);
timer_read.start(__LINE__);
ssize_t read_sz = read(in_fd, fbuf, blocksize);
timer_read.stop(__LINE__);
if(read_sz == -1) {
fprintf(stderr, "Could not read from '%s': %s\n", in_fn, strerror(errno));
exit(1);
}
offset += read_sz;
out_writer.write(fbuf, read_sz);
int ierr_copy = out_writer.copy(in_fd, size);
if(ierr_copy) {
fprintf(stderr, "Could not copy '%s': %s\n", in_fn, strerror(ierr_copy));
exit(1);
}
assert(offset == size);
file_off += size;
static char block[BLOCKSIZE]; /* bunch of zeros for padding to block size */
if(size % BLOCKSIZE) {
const size_t padsize = BLOCKSIZE - (size % BLOCKSIZE);
out_writer.write(block, padsize, NULL);
int ierr_write_tail = out_writer.write(block, padsize);
if(ierr_write_tail) {
fprintf(stderr, "Could not write to '%s': %s\n", out_fn, strerror(ierr_write_tail));
exit(1);
}
file_off += padsize;
}
timer_open.start(__LINE__);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment