Commit 8c8d5e37 authored by Aaron Saxton's avatar Aaron Saxton
Browse files

added error handeling when no moretic files exits, datasets based on bw...

added error handeling when no moretic files exits, datasets based on bw project id, and other maitnence
parent b241ebd2
*~
*.pyc
mongo_sitevar_20*.sh
*.log
*.o11*
*.e11*
*.o10*
*.e10*
*.o9*
*.e9*
*.txt
\ No newline at end of file
......@@ -38,6 +38,20 @@ mongo_sitevar_201611.sh
mongo_sitevar_201612.sh
"
EX_LIST_OF_CONF_FILES="mongo_sitevar_201601.sh
mongo_sitevar_201602.sh
mongo_sitevar_201603.sh
mongo_sitevar_201604.sh
mongo_sitevar_201605.sh
mongo_sitevar_201606.sh
mongo_sitevar_201607.sh
mongo_sitevar_201608.sh
mongo_sitevar_201609.sh
mongo_sitevar_201610.sh
mongo_sitevar_201611.sh
mongo_sitevar_201612.sh"
EX_LIST_OF_CONF_FILES="mongo_sitevar_201901.sh
mongo_sitevar_201902.sh
mongo_sitevar_201903.sh
......
......@@ -30,6 +30,7 @@ import os
import re
from os.path import join as ospJoin
monitoring_data_path = "/projects/monitoring_data/ovis"
monitoring_data_path = "/mnt/b/projects/sciteam/gjo/share/node_metrics/cray_system_sampler"
#monitoring_data_path = "/mnt/a/u/staff/saxton/Development/hpcmongodb/ingest/moc_data"
f_name_header = 'HEADER.20160115-'
database_name = 'monitoringData'
......@@ -242,6 +243,9 @@ if __name__== "__main__":
candidate_metric_files = [ospJoin(monitoring_data_path, d) for d in data_list_str]
_cmf_mask = [does_file_exist(cf) for cf in candidate_metric_files]
data_list_str = [d for m, d in zip(_cmf_mask, data_list_str) if m ]
num_metric_files = len(data_list_str)
if num_metric_files == 0:
raise Exception("No metric files found at {}. Exiting".format(monitoring_data_path))
hosts = get_hosts_ports_of_cluster_from_conf_filename(args.starter_hosts,
args.dateRange[0],
args.dateRange[1],
......@@ -254,8 +258,6 @@ if __name__== "__main__":
finish_data = [0,0,0,0] #np.array([0,0], dtype='i')
if comm.rank == 0:
#print(''.join(['\n']*(print_move_line+1)+['\r']), end='')
date_list_str = gen_date_list_str(*args.dateRange)
num_metric_files = len(data_list_str)
progress_table = {k: {"offset" : 0, "index": i} for i, k in enumerate(data_list_str)}
comm.barrier()
#initial dispatch
......@@ -348,8 +350,8 @@ if __name__== "__main__":
f_blk_offset=do_data[1],
top=args.top)
except Exception as e:
print("Handeling Exception From load_block_from_file, skipping block in file {}. Details {}".format(do_data[1], m_file_path,
e.with_traceback(sys.exc_info()[2])))
print("Handeling Exception From load_block_from_file, skipping block in file {} at {}. Details {}".format(m_file_path, do_data[1],
e.with_traceback(sys.exc_info()[2])))
EOF=False
line_count=0
......
......@@ -23,8 +23,19 @@ import random
from pyhpcmongodb.ingest.run import get_hosts_ports_of_cluster_from_conf_filename
torq_dir = "/u/staff/saxton/torque_data/data/bw/torque"
torq_file_name_template="TEMP-{}"
torq_dir = "/u/staff/saxton/torque_data/data/var"
torq_file_name_template="{}"
database_name = 'monitoringData'
def does_file_exist(f_name):
try:
os.stat(f_name)
except FileNotFoundError:
return False
else:
return True
def gen_date_list_str(D1, D2):
numdays = (D2 - D1).days
date_list = [D1 + datetime.timedelta(days=x) for x in range(0, numdays)]
......@@ -48,8 +59,15 @@ def ingest_torque(D1, D2, hosts):
torqueData = db.torqueData
for f_name in date_list:
print('f_name {}'.format(f_name))
with open(os.path.join(torq_dir, "TEMP-{}".format(f_name))) as f:
f_name_path = os.path.join(torq_dir, torq_file_name_template.format(f_name))
print('f_name {}'.format(f_name_path))
if does_file_exist(f_name_path):
pass
else:
print('f_name {} does not exist, skipping'.format(f_name_path))
continue
with open(f_name_path) as f:
d = [l.strip() for l in f]
doc_keys = ["dateTime", 'action', 'jobid', 'data']
......
......@@ -403,9 +403,6 @@ def get_agrigate_job_metrics(jobid, metric_name, host=None, db=None):
for m in job_metricData]
return job_metricData, explain_dict
'''
'''
def jobs_strata_metrics(start_d, end_d, min_num_nodes, max_num_nodes,
min_duration, max_duration,
......@@ -565,7 +562,77 @@ def jobs_strata(start_d, end_d, min_num_nodes, max_num_nodes,
jobs_nc_strata[nc] = a[lower:upper]
return jobs_nc_strata
def jobs_from_group_metrics(group, delta_step=None, hosts_ports=None, dbs=None,
index=0, num_workers=1, limit=None, offset=None):
avail_jobs_from_group = jobs_from_group(group, hosts_ports, dbs, limit, offset)
for job_count, job in enumerate(avail_jobs_from_group):
if job_count % num_workers != index:
continue
jobid = job['jobid']
nc = int(job['data']['unique_node_count'])
delta_index = 0
while True:
tick = time.time()
try:
try:
jobs_data_generator, t_min, t_max = get_job_metrics(jobid, dbs=dbs, delta_step=delta_step,
delta_index=delta_index)
except endOfJobMetricData:
break
except (poorlyFormedJobData, OperationFailure, emptyMetricList) as e:
if type(e) == OperationFailure:
print('get_job_metrics raised Operations Failure for jobid {}. skipping. Message: {}'.format(jobid, str(e)))
elif type(e) == poorlyFormedJobData:
print('in jobs_strata_metrics, poorlyFormedJobData for jobid {}. skipping. Message: {}'.format(jobid, str(e)))
elif type(e) == emptyMetricList:
print('in jobs_strata_metrics, emptyMetricList for jobid {}. skipping. Message: {}'.format(jobid, str(e)))
break
tock = time.time()
print(
"num_node: {}, "
"jobid: {}, "
"query time (sec): {:.3f}, "
"delta_step: {}, "
"delta_index: {}".format(nc, jobid, (tock-tick),
delta_step, delta_index)
)
delta_index += 1
yield jobid, jobs_data_generator, t_min, t_max
def jobs_from_group(group, hosts_ports=None, dbs=None, limit=None, offset=None):
if hosts_ports is not None:
for host_port in hosts_ports:
host, port = host_port.split(':')
port = int(port)
client = MongoClient(host, port, username="admin", password="admin")
db = client.get_database(database_name)
_dbs.append(db)
elif dbs is not None:
_dbs=dbs
else:
raise Exception('Must pass host or db (collection).')
torqueDataCollections = [db.torqueData for db in _dbs]
query = {
'action': "E",
'data.Exit_status': '0',
'data.account' : group
}
len_resp_list = constelation_find_estimated_document_count_sum(torqueDataCollections, query)
print("{} jobs in group {}".format(len_resp_list,group))
avail_jobs_from_group = constelation_find_append(
torqueDataCollections,
query
)
return avail_jobs_from_group
def make_job_cube(data_from_mongo, data_cube_t_length=60,
feature_major_order=False, key_length=None,
t_min=None, t_max=None, jobid=None, normalize=False):
......
from pyhpcmongodb.query.get import get_constelation_hosts
from pyhpcmongodb.query.get import make_job_cube
from pyhpcmongodb.query.get import jobs_strata_metrics
from pyhpcmongodb.query.get import jobs_from_group_metrics
from pyhpcmongodb.query.get import get_job_metrics
from pyhpcmongodb.query.get import EXPECTED_NUM_KEY_METRICS
from pyhpcmongodb.query.get import DATA_CUBE_DIM
......@@ -30,10 +31,12 @@ def build_strata_dataset(start_d, end_d,
dbs=dbs, delta_step=delta_step)
zero_cube_chunk = [make_job_cube(None, data_cube_t_length=data_cube_t_length,
key_length=EXPECTED_NUM_KEY_METRICS)]
yield [-1], zero_cube_chunk, ['noJob'.encode('utf-8')]
prev_jobid = None
chunk_i=0
for jobid, jm, t_min, t_max in jobs_strata_gen:
if prev_jobid != jobid:
yield [-1], zero_cube_chunk, ['noJob'.encode('utf-8')]
prev_jobid = jobid
tick = time.time()
try:
d_cube = make_job_cube(jm, data_cube_t_length=data_cube_t_length, jobid=jobid,
......@@ -50,7 +53,56 @@ def build_strata_dataset(start_d, end_d,
# list(itertools.zip_longest(list(chunk_up(np.arange(10), 3)), list(chunk_up(np.arange(17), 3))))
for chunk_i, batch_chunk in enumerate(d_cube_chunked):
yield [chunk_i], [batch_chunk], [jobid.encode('utf-8')]
yield [-1], zero_cube_chunk, ["noJob".encode('utf-8')]
yield [-1], zero_cube_chunk, ["noJob".encode('utf-8')]
data_shape = [1, data_cube_t_length] + 3*[DATA_CUBE_DIM] + [EXPECTED_NUM_KEY_METRICS]
# data_shape = [None]
dataset = tf.data.Dataset.from_generator(
job_metric_gen,
(tf.int32, tf.float32, tf.string),
(tf.TensorShape([1]),
tf.TensorShape(data_shape),
tf.TensorShape([1]))
)
dataset = dataset.prefetch(4)
return dataset
def build_dataset_from_group(group, limit, index=0,
num_workers=1, data_cube_t_length=60,
offset=None, shuffle=False,
hosts_ports=None, dbs=None):
delta_step=data_cube_t_length*60
def job_metric_gen():
jobs_from_group_gen = jobs_from_group_metrics(group, delta_step, hosts_ports, dbs, index, num_workers, limit, offset)
zero_cube_chunk = [make_job_cube(None, data_cube_t_length=data_cube_t_length,
key_length=EXPECTED_NUM_KEY_METRICS)]
prev_jobid = None
chunk_i=0
for jobid, jm, t_min, t_max in jobs_from_group_gen:
print("getting jobid {} metadata. t_min {}, t_max {}".format(jobid, t_min, t_max))
if prev_jobid != jobid:
yield [-1], zero_cube_chunk, ['noJob'.encode('utf-8')]
prev_jobid = jobid
tick = time.time()
try:
d_cube = make_job_cube(jm, data_cube_t_length=data_cube_t_length, jobid=jobid,
t_min=t_min, t_max=t_max, normalize=True)
except StopIteration:
print('jobid {} has zero length, skipping'.format(jobid))
continue
tock = time.time()
print("rank {}> jobid {}, make_job_cube wall clock time (sec): {:.3f}".format(index, jobid, tock-tick))
d_cube_chunked = chunk_up(d_cube, data_cube_t_length)
#fillvalue=zero_cube_batch)
# the following is what we are shooting for
# list(itertools.zip_longest(list(chunk_up(np.arange(10), 3)), list(chunk_up(np.arange(17), 3))))
for chunk_i, batch_chunk in enumerate(d_cube_chunked):
yield [chunk_i], [batch_chunk], [jobid.encode('utf-8')]
yield [-1], zero_cube_chunk, ["noJob".encode('utf-8')]
data_shape = [1, data_cube_t_length] + 3*[DATA_CUBE_DIM] + [EXPECTED_NUM_KEY_METRICS]
# data_shape = [None]
......@@ -64,6 +116,7 @@ def build_strata_dataset(start_d, end_d,
dataset = dataset.prefetch(3)
return dataset
def build_dataset_from_jobid_list(job_ids, host=None, db=None):
def job_metric_gen():
......
......@@ -14,7 +14,7 @@ export MONGO_BIN=/opt/mongodb/4.0.5/bin
export MONGO_BASE_DIR=/u/staff/saxton/scratch/hpcmongodb/metric_store_${SHARD_SVRS_NUMBER_OF_INSTANCES}_shards_${START_DATE}_${END_DATE}
export MONGO_TMP=${MONGO_BASE_DIR}
export USE_MEMORY_AS_DISK=true
export READ_ONLY=true
export READ_ONLY=false
export INIT_EVAL_STR="sh.enableSharding(\"monitoringData\")
sh.shardCollection( \"monitoringData.metricData\", { k_to_h : \"hashed\" }, false, { numInitialChunks: ${INIT_NUM_CHUNKS} } )
sh.disableBalancing(\"monitoringData.metricData\")
......@@ -26,6 +26,6 @@ export INIT_EVAL_STR="sh.enableSharding(\"monitoringData\")
monitoringData.torqueData.createIndex({dateTime: 1})
sh.disableAutoSplit()
config = db.getSiblingDB(\"config\")
config.settings.save( { _id:\"chunksize\", value: 2048 } )"
config.settings.save( { _id:\"chunksize\", value: 1024 } )"
SRC_DIR="/mnt/a/u/staff/saxton/Development/hpcmongodb" # "$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment