Commit 69c541b5 authored by Aaron Saxton's avatar Aaron Saxton
Browse files

handeling missing metric files in ingest

parent 8f284281
......@@ -14,6 +14,7 @@ else:
if not has_mpi:
print('No MPI')
import sys
import math
from pymongo import MongoClient
from pymongo.errors import BulkWriteError
......@@ -47,11 +48,11 @@ TEST=False
if has_mpi:
print("has_mpi, nprocs {}, rank {}".format(nprocs, comm.rank))
print_move_line = nprocs - comm.rank + 1
status_str = ''.join(["\033[F"]*print_move_line + [str(comm.rank)] + [' {}>line_count {}, %{}, lines/sec {:.3f}, file {}, offset {}, blkErrCount {}'] + ['\n']*print_move_line + ['\r'])
status_str = ''.join(["\033[F"]*print_move_line + [str(comm.rank)] + [' {}>{}>line_count {}, %{}, lines/sec {:.3f}, file {}, offset {}, blkErrCount {}'] + ['\n']*print_move_line + ['\r'])
found_eof_str = ''.join(["\033[F"]*print_move_line + [str(comm.rank)] + [' {}>Found EOF, file {}, offset {}'] + ['\n']*print_move_line + ['\r'])
found_eof_wait_str = ''.join(["\033[F"]*print_move_line + [str(comm.rank)] + [' {}>Found EOF, waiting, file {}, offset {}'] + ['\n']*print_move_line + ['\r'])
at_barr_str = ''.join(["\033[F"]*print_move_line + [str(comm.rank)] + ['>At Barrier'] + ['\n']*print_move_line + ['\r'])
no_top_status_str = ' {}>line_count {}, %{}, lines/sec {:.3f}, file {}, offset {}, blkErrCount {}'
no_top_status_str = ' {}>{}>line_count {}, %{}, lines/sec {:.3f}, file {}, offset {}, blkErrCount {}'
no_top_found_eof_str = ' {}>Found EOF, file {}, offset {}'
no_top_found_eof_wait_str = ' {}>Found EOF, waiting, file {}, offset {}'
no_top_at_barr_str = ''.join([str(comm.rank)] + ['>At Barrier'])
......@@ -82,7 +83,14 @@ def backOffGen(init, rate, maxStep):
yield init*math.exp(rate*step)
raise backOffException("Querry Failed too many times")
def does_file_exist(f_name):
try:
os.stat(f_name)
except FileNotFoundError:
return False
else:
return True
def load_block_from_file(f_path_name, db_host_name, db_host_port, f_blk_size=None, f_blk_offset=None, top=False ):
client = MongoClient(db_host_name, db_host_port, username="admin", password="admin")
db = client.get_database(database_name)
......@@ -112,6 +120,7 @@ def load_block_from_file(f_path_name, db_host_name, db_host_port, f_blk_size=Non
lines_p_sec = None
num_char_tot = 0
for line_count, l in enumerate(f):
num_char_tot += len(l)
if TEST and int(line_count / DOC_BLOCK_SIZE) == 3:
......@@ -125,10 +134,15 @@ def load_block_from_file(f_path_name, db_host_name, db_host_port, f_blk_size=Non
split_str = l.split(',')
concat_t_id=split_str[0]+split_str[2]
doc = {k: t(v.strip()) for k, t, v in zip(h_str_list, h_type_list, split_str)}
doc['k_to_h'] = concat_t_id
doc['#TimeTrunSec'] = int(doc['#Time'])
metricData_list.append(doc)
try:
doc = {k: t(v.strip()) for k, t, v in zip(h_str_list, h_type_list, split_str)}
except ValueError as e:
print("Handeling ValueError, skipping doc in file {}. Details {}".format(f_path_name,
e.with_traceback(sys.exc_info()[2])))
else:
doc['k_to_h'] = concat_t_id
doc['#TimeTrunSec'] = int(doc['#Time'])
metricData_list.append(doc)
if (line_count % DOC_BLOCK_SIZE == 0 and line_count != 0) or DOC_BLOCK_SIZE == 1:
if DOC_BLOCK_SIZE == 1:
resp = metricData.insert_one(metricData_list[0])
......@@ -168,12 +182,12 @@ def load_block_from_file(f_path_name, db_host_name, db_host_port, f_blk_size=Non
pct_done = int(100.*(f_blk_offset+num_char_tot)/float(f_tot_size))
if resp is not None:
if top:
print(status_str.format(db_host_name, line_count,
print(status_str.format(db_host_name, comm.rank, line_count,
pct_done, lines_p_sec,
f_path_name.split('/')[-1], f_blk_offset,
load_block_from_file.BulkErrorCounter), end='')
elif line_count % (DOC_BLOCK_SIZE*16) == 0:
print(no_top_status_str.format(db_host_name, line_count,
print(no_top_status_str.format(db_host_name, comm.rank, line_count,
pct_done, lines_p_sec,
f_path_name.split('/')[-1], f_blk_offset,
load_block_from_file.BulkErrorCounter))
......@@ -224,6 +238,10 @@ if __name__== "__main__":
args = parser.parse_args()
data_list_str = gen_date_list_str(args.dateRange[0], args.dateRange[1])
# need to check that all these files exists ^^^^
candidate_metric_files = [ospJoin(monitoring_data_path, d) for d in data_list_str]
_cmf_mask = [does_file_exist(cf) for cf in candidate_metric_files]
data_list_str = [d for m, d in zip(_cmf_mask, data_list_str) if m ]
hosts = get_hosts_ports_of_cluster_from_conf_filename(args.starter_hosts,
args.dateRange[0],
args.dateRange[1],
......@@ -322,12 +340,19 @@ if __name__== "__main__":
break
m_file_name = data_list_str[do_data[0]]
m_file_path = ospJoin(monitoring_data_path, m_file_name)
EOF, line_count = load_block_from_file(m_file_path,
mongodb_host_name,
mongodb_host_port,
f_blk_size=DOC_BLOCK_PROC_SIZE,
f_blk_offset=do_data[1],
top=args.top)
try:
EOF, line_count = load_block_from_file(m_file_path,
mongodb_host_name,
mongodb_host_port,
f_blk_size=DOC_BLOCK_PROC_SIZE,
f_blk_offset=do_data[1],
top=args.top)
except Exception as e:
print("Handeling Exception From load_block_from_file, skipping block in file {}. Details {}".format(do_data[1], m_file_path,
e.with_traceback(sys.exc_info()[2])))
EOF=False
line_count=0
tock = time.time()
if EOF:
finish_data[0] = 1
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment