Commit b241ebd2 authored by Aaron Saxton's avatar Aaron Saxton
Browse files

made data pipline between mongo query and model data a true generator for memory efficency

parent 69c541b5
import argparse
from pymongo import MongoClient
from pymongo import DESCENDING
from pymongo import ASCENDING
from pymongo.errors import OperationFailure
import pymongo
from datetime import datetime as dt
import random
......@@ -116,6 +119,9 @@ metric_fields = ['cTime', 'cTime_usec', 'DT', 'DT_usec', 'CompId',
class emptyJobList(Exception):
pass
class emptyMetricList(Exception):
pass
class tooManyMetrics(Exception):
pass
......@@ -146,11 +152,41 @@ def get_node_intervals(exec_host):
return chunks
def constelation_find_append(dbs_collection, query):
return [ res for db_c in dbs_collection for res in db_c.find(query)] #.batch_size(200000)
for db_c in dbs_collection:
for res in db_c.find(query):
yield res
def constelation_find_min_max(dbs_collection, query, field):
res_max = []
res_min = []
for db_c in dbs_collection:
try:
_n = db_c.find(query, {field:1}).sort(field, DESCENDING).limit(1).next()
except StopIteration:
pass
else:
res_max.append( _n[field])
try:
_n = db_c.find(query, {field:1}).sort(field, ASCENDING).limit(1).next()
except StopIteration:
pass
else:
res_min.append( _n[field])
if len(res_max) == 0 or len(res_min) == 0:
raise emptyMetricList("res_min {}, res_max {}".format(res_min, res_max))
return min(res_min), max(res_max)
def get_job_metrics(jobid, hosts_ports=None, dbs=None,
delta_step=None, delta_index=None):
#return [ res for db_c in dbs_collection for res in db_c.find(query)] #.batch_size(200000)
def constelation_find_estimated_document_count_sum(dbs_collection, query):
#return sum([ db_c.find(query).estimated_document_count() for db_c in dbs_collection]) #.batch_size(200000)
return sum([ db_c.find(query).count() for db_c in dbs_collection]) #.batch_size(200000)
def get_auto_nomalized_job_metrics(jobid, hosts_ports=None, dbs=None,
delta_step=None, delta_index=None):
_dbs = []
if hosts_ports is not None:
for host_port in hosts_ports:
......@@ -165,10 +201,10 @@ def get_job_metrics(jobid, hosts_ports=None, dbs=None,
torqueDataCollections = [db.torqueData for db in _dbs]
metricDataCollections = [db.metricData for db in _dbs]
resp_list = constelation_find_append(torqueDataCollections, {"jobid": jobid, 'action': 'E'})
len_resp_list = constelation_find_estimated_document_count_sum(torqueDataCollections, {"jobid": jobid, 'action': 'E'})
resp_list = list(constelation_find_append(torqueDataCollections, {"jobid": jobid, 'action': 'E'}))
if len(resp_list) > 1 or len(resp_list) == 0:
if len_resp_list > 1 or len_resp_list == 0:
actions = [r['action'] for r in resp_list]
raise poorlyFormedJobData("Something is wrong, "
"job {} data has multiple or 0 "
......@@ -206,8 +242,10 @@ def get_job_metrics(jobid, hosts_ports=None, dbs=None,
#time_or_dict = {"#TimeTrunSec": {"$gte": int(_start), "$lt": int(_end)}}
time_or_dict = {"#Time": {"$gte": _start, "$lt": _end}}
find_dict = {"$and":[time_or_dict, node_or_dict]}
job_metricData = constelation_find_append(metricDataCollections, find_dict)
#est_count = constelation_find_estimated_document_count_sum(metricDataCollections, find_dict)
#print("jobid {} has estimated doc count of {}. Starting find query".format(jobid, est_count))
job_metricData = list(constelation_find_append(metricDataCollections, find_dict))
job_metricData_keys = sorted(set([k for m in job_metricData for k in m.keys()]))
job_metricData_keys = [k for k in job_metricData_keys if k not in exclude_keys]
......@@ -230,6 +268,73 @@ def get_job_metrics(jobid, hosts_ports=None, dbs=None,
return job_metricData, explain_dict
def get_job_metrics(jobid, hosts_ports=None, dbs=None,
delta_step=None, delta_index=None):
_dbs = []
if hosts_ports is not None:
for host_port in hosts_ports:
host, port = host_port.split(':')
client = MongoClient(host, port, username="admin", password="admin")
db = client.get_database(database_name)
_dbs.append(db)
elif dbs is not None:
_dbs = dbs
else:
raise Exception('Must pass host, or db (collection).')
torqueDataCollections = [db.torqueData for db in _dbs]
metricDataCollections = [db.metricData for db in _dbs]
resp_list = list(constelation_find_append(torqueDataCollections, {"jobid": jobid, 'action': 'E'}))
if len(resp_list) > 1 or len(resp_list) == 0:
actions = [r['action'] for r in resp_list]
raise poorlyFormedJobData("Something is wrong, "
"job {} data has multiple or 0 "
"resluts from r['action']=='E'\n"
"Only actions are {}".format(jobid, actions))
if 'data' not in resp_list[0]:
raise poorlyFormedJobData("Something is wrong, "
"Job {} has no 'data' field. "
"These are the fields it has {}".format(jobid, list(resp_list[0].keys())))
missing_data_fields = [field_not_exist for field_not_exist in ['start', 'end', 'exec_host'] if field_not_exist not in resp_list[0]['data']]
if len(missing_data_fields) > 0:
raise poorlyFormedJobData("Something is wrong, "
"Job {} has no {} fields. "
"These are the fields it has {}".format(jobid,
missing_data_fields,
list(resp_list[0]['data'].keys())))
start,end,exec_host = resp_list[0]['data']['start'], resp_list[0]['data']['end'], resp_list[0]['data']['exec_host']
# delta_step=None, delta_index=None
if delta_step is not None:
_start = delta_step*delta_index + int(start)
__end = delta_step*(delta_index + 1) + int(start)
if _start >= int(end):
raise endOfJobMetricData()
_end = min(__end, int(end))
else:
_end = int(end)
_start = int(start)
node_inter = get_node_intervals(exec_host)
node_or_dict = {"$or": [{"CompId":{"$gte": ni[0], "$lte": ni[1]}} for ni in node_inter]}
#time_or_dict = {"#TimeTrunSec": {"$gte": int(_start), "$lt": int(_end)}}
time_or_dict = {"#Time": {"$gte": _start, "$lt": _end}}
find_dict = {"$and":[time_or_dict, node_or_dict]}
#est_count = constelation_find_estimated_document_count_sum(metricDataCollections, find_dict)
#print("jobid {} has estimated doc count of {}. Starting find query".format(jobid, est_count))
t_min, t_max = constelation_find_min_max(metricDataCollections, find_dict, '#Time')
job_metricData_generator = constelation_find_append(metricDataCollections, find_dict)
#job_metricData = [{k: m[k] for k in m if k not in exclude_scale_keys} \
# for m in job_metricData_generator]
return job_metricData_generator, t_min, t_max
def get_agrigate_job_metrics(jobid, metric_name, host=None, db=None):
"""
DEFUNCT. Must re-write
......@@ -360,42 +465,32 @@ def jobs_strata_metrics(start_d, end_d, min_num_nodes, max_num_nodes,
tick = time.time()
try:
try:
jobs_data, explain_dict = get_job_metrics(jobid, dbs=dbs, delta_step=delta_step,
delta_index=delta_index)
jobs_data_generator, t_min, t_max = get_job_metrics(jobid, dbs=dbs, delta_step=delta_step,
delta_index=delta_index)
except endOfJobMetricData:
break
except poorlyFormedJobData as e:
print('in jobs_strata_metrics, poolyFormedJobData for jobid {}. skipping. Message: {}'.format(jobid, str(e)))
except (poorlyFormedJobData, OperationFailure, emptyMetricList) as e:
if type(e) == OperationFailure:
print('get_job_metrics raised Operations Failure for jobid {}. skipping. Message: {}'.format(jobid, str(e)))
elif type(e) == poorlyFormedJobData:
print('in jobs_strata_metrics, poorlyFormedJobData for jobid {}. skipping. Message: {}'.format(jobid, str(e)))
elif type(e) == emptyMetricList:
print('in jobs_strata_metrics, emptyMetricList for jobid {}. skipping. Message: {}'.format(jobid, str(e)))
break
time_stamps = [_j["#Time"] for _j in jobs_data]
if len(time_stamps) > 0:
data_time_length = max(time_stamps) - min(time_stamps)
else:
data_time_length = 0
tock = time.time()
num_metric_keys = len(set([k for d in jobs_data for k in d.keys() if k not in exclude_keys + exclude_scale_keys]))
#num_metric_keys = len(set([k for d in jobs_data for k in d.keys() if k in metric_fields]))
if EXPECTED_NUM_KEY_METRICS != num_metric_keys:
#raise tooManyMetrics("EXPECTED_NUM_KEY_METRICS != num_metric_keys, {} != {}".format(EXPECTED_NUM_KEY_METRICS, num_metric_keys))
print("EXPECTED_NUM_KEY_METRICS != num_metric_keys, "
"{} != {}, jobid {}, skipping".format(EXPECTED_NUM_KEY_METRICS,
num_metric_keys, jobid))
break
print(
"num_node: {}, "
"jobid: {}, "
"num_metric_keys: {}, "
"query time (sec): {}, "
"data length (doc) {}, "
"data_time_length {}".format(nc, jobid,
num_metric_keys, (tock-tick),
len(jobs_data), data_time_length)
"query time (sec): {:.3f}, "
"delta_step: {}, "
"delta_index: {}".format(nc, jobid, (tock-tick),
delta_step, delta_index)
)
delta_index += 1
yield jobid, jobs_data, explain_dict
yield jobid, jobs_data_generator, t_min, t_max
def jobs_strata(start_d, end_d, min_num_nodes, max_num_nodes,
min_duration, max_duration, limit, hosts_ports=None, dbs=None, offset=None):
......@@ -415,11 +510,14 @@ def jobs_strata(start_d, end_d, min_num_nodes, max_num_nodes,
host (str): mongodb routher host
db (pymongo obj): pymongo database object
"""
print("Looking up jobs strata start_d {}, end_d {}, min_num_nodes {}, max_num_nodes {}, min_duration {}, max_duration {}, limit {}".format(
start_d, end_d, min_num_nodes, max_num_nodes,min_duration, max_duration, limit))
_dbs=[]
if hosts_ports is not None:
for host_port in hosts_ports:
host, port = host_port.split(':')
port = int(port)
client = MongoClient(host, port, username="admin", password="admin")
db = client.get_database(database_name)
_dbs.append(db)
......@@ -442,49 +540,51 @@ def jobs_strata(start_d, end_d, min_num_nodes, max_num_nodes,
{
'dateTime': {"$gt": start_d, "$lt": end_d },
'action': "E",
'data.Exit_status': '0'
'data.Exit_status': '0',
}
)
jobs_nc_strata = {}
for nc in range(min_num_nodes, max_num_nodes+1):
jobs_nc_strata[nc] = [a for a in avail_jobs_data_ended if int(a['data']['unique_node_count']) == nc]
jobs_nc_strata[nc] = [
a for a in jobs_nc_strata[nc] \
if (int(a['data']['end']) - int(a['data']['start']))/60. < max_duration \
and (int(a['data']['end']) - int(a['data']['start']))/60. > min_duration
]
jobs_data_nc_index = {}
num_jobs = len(jobs_nc_strata[nc])
if num_jobs == 0:
print("num_jobs is 0 for node count {}".format(nc))
continue
if limit is not None:
jobs_nc_strata = {nc: list() for nc in range(min_num_nodes, max_num_nodes+1)}
for a in avail_jobs_data_ended:
if (int(a['data']['end']) - int(a['data']['start']))/60. < max_duration \
and (int(a['data']['end']) - int(a['data']['start']))/60. > min_duration \
and int(a['data']['unique_node_count']) >= min_num_nodes \
and int(a['data']['unique_node_count']) <= max_num_nodes:
jobs_nc_strata[int(a['data']['unique_node_count'])].append(a)
_zero_len_strata = [(nc, jobs_nc_strata.pop(nc)) for nc in range(min_num_nodes, max_num_nodes+1) if len(jobs_nc_strata[nc]) == 0 ]
if limit is not None:
for nc, a in jobs_nc_strata.items():
num_jobs = len(a)
if num_jobs > limit:
lower = offset if offset is not None else random.choice(range(num_jobs - limit))
upper = lower + limit
else:
lower = 0
upper = num_jobs
jobs_nc_strata[nc] = jobs_nc_strata[nc][lower:upper]
jobs_nc_strata[nc] = a[lower:upper]
return jobs_nc_strata
def make_job_cube(data_from_mongo, data_cube_t_length=60,
feature_major_order=False, key_length=None):
feature_major_order=False, key_length=None,
t_min=None, t_max=None, jobid=None, normalize=False):
'''
:int data_cube_t_length: data_cube_t_length can be None to return actual data length
:int key_length: if key_length is not None, this func will return a job cube of all 0's
'''
import pdb;
if data_from_mongo is not None:
i_data_from_mongo = next(data_from_mongo)
if key_length is not None:
_key_length = key_length
else:
keys = sorted(list(set([k for d in data_from_mongo for k in d.keys() if k not in exclude_keys + exclude_scale_keys])))
keys = sorted(list(set([k for k in i_data_from_mongo.keys() if k not in exclude_keys + exclude_scale_keys])))
#keys = sorted(list(set([k for d in data_from_mongo for k in d.keys() if k not in metric_fields])))
_key_length = len(keys)
keys_id = list(range(_key_length))
if feature_major_order:
pass
else:
......@@ -497,18 +597,29 @@ def make_job_cube(data_from_mongo, data_cube_t_length=60,
data_t_cube = np.zeros(data_shape, dtype=np.float32)
if data_from_mongo is not None and data_cube_t_length is not None:
_wip = [l["#Time"] for l in data_from_mongo]
max_sec = int(max(_wip))
min_sec = int(min(_wip))
num_min = int(( int(max_sec) - int(min_sec))/60.) + 1
for dm in data_from_mongo:
i = int((int(dm['#Time']) - min_sec)/60)
num_min = int(( int(t_max) - int(t_min))/60.) + 1
def _rewind_data_from_mongo():
yield i_data_from_mongo
for dm in data_from_mongo:
yield dm
for doc_count, dm in enumerate(_rewind_data_from_mongo()):
i = int((int(dm['#Time']) - t_min)/60)
if i >= clip_t_index:
continue
x,y,z = [dm[k] for k in coord_keys]
ordered_values = [float(dm[k]) for k in keys]
np.put(data_t_cube[i,x,y,z], keys_id, ordered_values)
print('make_data_cube jobid: {}, doc_count: {}'.format(jobid, doc_count))
# normalize
if normalize:
d_cube_min = np.min(data_t_cube, axis=0)
d_cube_scale = np.max(data_t_cube, axis=0) - d_cube_min
data_t_cube = (data_t_cube - d_cube_min)/d_cube_scale
np.nan_to_num(data_t_cube, copy=False)
return data_t_cube
def get_constelation_hosts(host=None, db=None):
......@@ -552,6 +663,6 @@ if __name__== "__main__":
'open interval, e.g. [0,1). ')
args = parser.parse_args()
get_job_metrics(args.jobid, host=args.host)
get_auto_nomalized_job_metrics(args.jobid, host=args.host)
......@@ -8,6 +8,7 @@ from sys import getsizeof
import tensorflow as tf
import numpy as np
import itertools
import time
def chunk_up(l, n):
"""Yield successive n-sized chunks from l."""
......@@ -19,7 +20,7 @@ def build_strata_dataset(start_d, end_d,
min_duration, max_duration,
limit, index=0, num_workers=1, data_cube_t_length=60, offset=None, shuffle=False,
hosts_ports=None, dbs=None):
delta_step=2*data_cube_t_length*60
delta_step=data_cube_t_length*60
def job_metric_gen():
jobs_strata_gen = jobs_strata_metrics(start_d, end_d,
min_num_nodes, max_num_nodes,
......@@ -32,11 +33,18 @@ def build_strata_dataset(start_d, end_d,
yield [-1], zero_cube_chunk, ['noJob'.encode('utf-8')]
prev_jobid = None
chunk_i=0
for jobid, jm, _ in jobs_strata_gen:
d_cube = make_job_cube(jm, data_cube_t_length=data_cube_t_length)
for jobid, jm, t_min, t_max in jobs_strata_gen:
tick = time.time()
try:
d_cube = make_job_cube(jm, data_cube_t_length=data_cube_t_length, jobid=jobid,
t_min=t_min, t_max=t_max, normalize=True)
except StopIteration:
print('jobid {} has zero length, skipping'.format(jobid))
continue
tock = time.time()
print("rank {}> jobid {}, make_job_cube wall clock time (sec): {:.3f}".format(index, jobid, tock-tick))
d_cube_chunked = chunk_up(d_cube, data_cube_t_length)
#fillvalue=zero_cube_batch)
# the following is what we are shooting for
# list(itertools.zip_longest(list(chunk_up(np.arange(10), 3)), list(chunk_up(np.arange(17), 3))))
......@@ -53,7 +61,7 @@ def build_strata_dataset(start_d, end_d,
tf.TensorShape(data_shape),
tf.TensorShape([1]))
)
dataset = dataset.prefetch(4)
dataset = dataset.prefetch(3)
return dataset
def build_dataset_from_jobid_list(job_ids, host=None, db=None):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment