Commit 6a2da227 authored by Roland Haas's avatar Roland Haas
Browse files

mpitar: add comments try to make code clearer

parent aa0b8c02
......@@ -171,7 +171,7 @@ void master(const char *out_fn, fileentries& entries)
timer_open.stop(__LINE__);
// I need this barrier so that all ranks wait until the last one has opened
// and truncated the file
// I cannot use r+ since this seems to make fseek in in data into the buffer
// I cannot use r+ since this seems to make fseek reads in data into the buffer
MPI_Barrier(MPI_COMM_WORLD);
async_writer out_writer(out_fh);
......@@ -218,6 +218,7 @@ void master(const char *out_fn, fileentries& entries)
for(int r = 0 ; count != MPI_UNDEFINED && r < count ; r++) {
int idx = recv_completed[r];
show_progress(off, size_t(recv_buffers[idx]), 0);
int w = status[r].MPI_SOURCE;
jobs_in_flight[w] -= 1;
}
......@@ -232,6 +233,7 @@ void master(const char *out_fn, fileentries& entries)
assert(send_id >= 0);
send_buffers[buf0num+send_id].clear();
assert(send_buffers[buf0num+send_id].size() == 0);
/* make a job package for a worker containing up to MAX_FILES_IN_JOB
* files and aiming to be at least TARGET_JOB_SIZE bytes worth of files
* */
......@@ -245,21 +247,24 @@ void master(const char *out_fn, fileentries& entries)
done = 1;
break;
}
timer_stat.start(__LINE__);
tarentry ent(fn, off);
timer_stat.stop(__LINE__);
const size_t sz = ent.size();
job_sz += sz;
//printf("%s (%zu bytes)\n", fn, sz);
/* enough room for
* filename, \0, decimal file size, \0, decimal offset into output, \0
* using 20 decimal digits is enough for 64 bit numbers */
send_buffers.at(buf0num+send_id) += ent.serialize();
timer_write_index.start(__LINE__);
fprintf(idx_fh, "%zu %s\n", off, fn.c_str());
timer_write_index.stop(__LINE__);
off += sz;
}
/* send package to worker */
if(!send_buffers[buf0num+send_id].empty()) {
/* prepare for "done" message from worker */
timer_master_wait.start(__LINE__);
......@@ -270,6 +275,7 @@ void master(const char *out_fn, fileentries& entries)
current_worker, send_id, MPI_COMM_WORLD,
&send_requests[buf0num+send_id]);
timer_master_wait.stop(__LINE__);
jobs_in_flight[current_worker] += 1;
}
}
......@@ -304,7 +310,7 @@ void master(const char *out_fn, fileentries& entries)
show_progress(off, off-current, 0); /* show 100% written */
printf("\n");
/* add index file to tar */
/* add index file to tarball */
timer_write_index.start(__LINE__);
fprintf(idx_fh, "%zu %s\n", off, idx_fn);
int ierr_fclose = fclose(idx_fh);
......@@ -314,6 +320,7 @@ void master(const char *out_fn, fileentries& entries)
exit(1);
}
timer_write_index.stop(__LINE__);
timer_stat.start(__LINE__);
tarentry idx_ent(idx_fn, off);
timer_stat.stop(__LINE__);
......@@ -327,6 +334,7 @@ void master(const char *out_fn, fileentries& entries)
out_writer.write(buffer, 2*BLOCKSIZE, NULL);
timer_write_master_write.stop(__LINE__);
out_writer.finalize();
timer_write_master_write.start(__LINE__);
int ierr_close = fclose(out_fh);
timer_write_master_write.stop(__LINE__);
......@@ -335,6 +343,7 @@ void master(const char *out_fn, fileentries& entries)
strerror(errno));
exit(1);
}
printf("Done.\n");
timer_master_wait.start(__LINE__);
......@@ -345,9 +354,9 @@ void master(const char *out_fn, fileentries& entries)
/* make this a non-local type to make the compiler happy */
struct filedesc_t {
tarentry ent;
int ask_for_work;
bool ask_for_work;
int tag;
filedesc_t(tarentry ent_, int ask_for_work_, int tag_) :
filedesc_t(tarentry ent_, bool ask_for_work_, int tag_) :
ent(ent_), ask_for_work(ask_for_work_), tag(tag_) {};
};
void worker(const char *out_fn)
......@@ -355,6 +364,7 @@ void worker(const char *out_fn)
std::queue<filedesc_t> files;
int file_count = 0;
// open output file
timer_open.start(__LINE__);
// I need this barrier so that all ranks wait until the last one has opened
// and truncated the file
......@@ -382,6 +392,8 @@ void worker(const char *out_fn)
int done = 0;
do {
/* check for any new work package from master, wait only if we have nothing
* else to do */
MPI_Status status;
int count, flag;
if(files.empty()) {
......@@ -395,6 +407,7 @@ void worker(const char *out_fn)
timer_worker_wait.stop(__LINE__);
}
if(flag) {
/* data is waiting, get it */
timer_worker_wait.start(__LINE__);
MPI_Get_count(&status, MPI_BYTE, &count);
std::vector<char> recv_buffer(count);
......@@ -402,17 +415,18 @@ void worker(const char *out_fn)
MPI_Recv(&recv_buffer[0], count, MPI_BYTE, 0, MPI_ANY_TAG, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
timer_worker_wait.stop(__LINE__);
int first = 1;
for(char *p = &recv_buffer[0], *s = p ; p - s < (ptrdiff_t)recv_buffer.size() ; ) {
/* parse buffer into file entries */
for(size_t ind = 0, sz ; ind < recv_buffer.size() ; ind += sz) {
tarentry ent;
p += ent.deserialize(p);
sz = ent.deserialize(&recv_buffer[ind]);
/* magic empty file name for end of work? */
if(ent.get_filename().empty()) {
done = 1;
break;
}
files.push(filedesc_t(ent, first, tag));
first = 0;
/* request more work from master when processing first entry */
files.push(filedesc_t(ent, ind==0, tag));
}
}
......@@ -425,6 +439,7 @@ void worker(const char *out_fn)
copy_file_content(out_writer, out_fn, file.ent);
file_count += 1;
chunk_written += static_cast<unsigned long long int>(file.ent.size());
if(file.ask_for_work) {
timer_worker_wait.start(__LINE__);
MPI_Send(&chunk_written, 1, MPI_UNSIGNED_LONG_LONG, 0, file.tag, MPI_COMM_WORLD);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment