Commit 8f284281 authored by Aaron Saxton's avatar Aaron Saxton
Browse files

First commit, multiple cluster run support and pyhpcmongodb support

parents
This source diff could not be displayed because it is too large. You can view the blob instead.
#!/bin/bash
#PBS -l walltime=6:00:00
#PBS -N multiple_clusters_ingest
#PBS -l nodes=3168:ppn=32:xe
#PBS -l flags=commtransparent
##PBS -l advres=saxton
MONGOBWMONITORINGMETRICS_SRC=${HOME}/Development/mongobwmonitoringmetrics
export NUM_PE_TORQUE=32
export NUM_PE_METRIC=256
source ${MONGOBWMONITORINGMETRICS_SRC}/job_scripts/run_multiple_clusters_ingest.sh
EX_LIST_OF_CONF_FILES="mongo_sitevar_201501.sh
mongo_sitevar_201502.sh
mongo_sitevar_201503.sh
mongo_sitevar_201504.sh
mongo_sitevar_201505.sh
mongo_sitevar_201506.sh
mongo_sitevar_201507.sh
mongo_sitevar_201508.sh
mongo_sitevar_201509.sh
mongo_sitevar_201510.sh
mongo_sitevar_201511.sh
mongo_sitevar_201512.sh
mongo_sitevar_201601.sh
mongo_sitevar_201602.sh
mongo_sitevar_201603.sh
mongo_sitevar_201604.sh
mongo_sitevar_201605.sh
mongo_sitevar_201606.sh
mongo_sitevar_201607.sh
mongo_sitevar_201608.sh
mongo_sitevar_201609.sh
mongo_sitevar_201610.sh
mongo_sitevar_201611.sh
mongo_sitevar_201612.sh
"
EX_LIST_OF_CONF_FILES="mongo_sitevar_201901.sh
mongo_sitevar_201902.sh
mongo_sitevar_201903.sh
mongo_sitevar_201904.sh
mongo_sitevar_201905.sh
mongo_sitevar_201906.sh
mongo_sitevar_201907.sh
mongo_sitevar_201908.sh
mongo_sitevar_201909.sh
mongo_sitevar_201910.sh
mongo_sitevar_201911.sh
mongo_sitevar_201912.sh"
for F in $EX_LIST_OF_CONF_FILES; do
_EX_LIST_OF_CONF_FILES="$MONGOBWMONITORINGMETRICS_SRC/site_envs/$F $_EX_LIST_OF_CONF_FILES"
done
EX_LIST_OF_CONF_FILES=$_EX_LIST_OF_CONF_FILES
launch_clusters_and_ingest $EX_LIST_OF_CONF_FILES
#!/bin/bash
#PBS -l walltime=6:00:00
#PBS -N run_and_encode
#PBS -l nodes=256:ppn=32:xe+128:ppn=16:xk
#PBS -l flags=commtransparent
module load bwpy
module load bwpy-mpi
source ~/Development/hpcmongodb/virenv/bin/activate
START_DATE="20180101"
END_DATE="20180201"
CHUNK_FACTOR=5
export CONFIG_SVRS_NUMBER_OF_INSTANCES=7
export ROUTER_SVRS_NUMBER_OF_INSTANCES=256
export ROUTER_SVRS_CONCURRENCY_PER_NODE=4
export SHARD_SVRS_NUMBER_OF_INSTANCES=256
export SHARD_SVRS_CONCURRENCY_PER_NODE=2
INIT_NUM_CHUNKS=$(( ${CHUNK_FACTOR} * ${SHARD_SVRS_NUMBER_OF_INSTANCES} ))
export MONGO_BIN=/opt/mongodb/4.0.5/bin
export MONGO_BASE_DIR=/u/staff/saxton/scratch/hpcmongodb/metric_store_${SHARD_SVRS_NUMBER_OF_INSTANCES}_shards_${START_DATE}_${END_DATE}
export MONGO_TMP=${MONGO_BASE_DIR}
export USE_MEMORY_AS_DISK=true
export READ_ONLY=true
export INIT_EVAL_STR="sh.enableSharding(\"monitoringData\")
sh.shardCollection( \"monitoringData.metricData\", { k_to_h : \"hashed\" }, false, { numInitialChunks: ${INIT_NUM_CHUNKS} } )
sh.disableBalancing(\"monitoringData.metricData\")
sh.shardCollection( \"monitoringData.torqueData\", { jobid : \"hashed\" }, false, { numInitialChunks: ${INIT_NUM_CHUNKS} } )
sh.disableBalancing(\"monitoringData.torqueData\")
monitoringData = db.getSiblingDB(\"monitoringData\")
monitoringData.metricData.createIndex({CompId: 1})
monitoringData.metricData.createIndex({\"#Time\": 1})
monitoringData.torqueData.createIndex({dateTime: 1})
sh.disableAutoSplit()
config = db.getSiblingDB(\"config\")
config.settings.save( { _id:\"chunksize\", value: 1024 } )"
SRC_DIR="/mnt/a/u/staff/saxton/Development/hpcmongodb" # "$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
export PROG_BOOT_LOCK_FILE=${SRC_DIR}/cluster_run/prog_boot_lock.lock
touch ${PROG_BOOT_LOCK_FILE}
cd ${SRC_DIR}/cluster_run
touch ${PROG_BOOT_LOCK_FILE}
./run.sh &
RUN_PID=$!
echo "sleep waitintg for cluster to become available"
while [ -f ${PROG_BOOT_LOCK_FILE} ]
do
sleep 2
done
if [ -f $MONGO_BASE_DIR/mongo_force_shutdown.sem ]
then
echo "Cluster Failed Startup, exiting"
exit 256
else
echo "Cluster is alive!"
fi
cd /u/staff/saxton/Development/rnnsystemmonitor
echo "changed directory to $(pwd). running aprun"
export NUM_WORKERS=128
H=$( cat ${MONGO_BASE_DIR}/router_svrs_and_ports.txt )
aprun -n ${NUM_WORKERS} -N 1 -- python test_bed.py --hosts $H > encode_log_$(date +"%d-%m-%Y-%H-%M-%S").txt
kill -SIGUSR1 ${RUN_PID}
wait
#!/bin/bash
#PBS -l walltime=4:40:00
#PBS -N run_and_ingest
#PBS -l nodes=264:ppn=32:xe
#PBS -l flags=commtransparent
##PBS -l advres=saxton
module load bwpy
module load bwpy-mpi
source ~/Development/hpcmongodb/virenv/bin/activate
if [ -z "${MONGOCONFIGPATH}" ]
then
START_DATE="20180201"
END_DATE="20180301"
CHUNK_FACTOR=5
export CONFIG_SVRS_NUMBER_OF_INSTANCES=7
export ROUTER_SVRS_NUMBER_OF_INSTANCES=256
export ROUTER_SVRS_CONCURRENCY_PER_NODE=4
export SHARD_SVRS_NUMBER_OF_INSTANCES=256
export SHARD_SVRS_CONCURRENCY_PER_NODE=2
INIT_NUM_CHUNKS=$(( ${CHUNK_FACTOR} * ${SHARD_SVRS_NUMBER_OF_INSTANCES} ))
export MONGO_BIN=/opt/mongodb/4.0.5/bin
export MONGO_BASE_DIR=/u/staff/saxton/scratch/hpcmongodb/metric_store_${SHARD_SVRS_NUMBER_OF_INSTANCES}_shards_${START_DATE}_${END_DATE}
export MONGO_TMP=${MONGO_BASE_DIR}
export USE_MEMORY_AS_DISK=true
export INIT_EVAL_STR="sh.enableSharding(\"monitoringData\")
sh.shardCollection( \"monitoringData.metricData\", { k_to_h : \"hashed\" }, false, { numInitialChunks: ${INIT_NUM_CHUNKS} } )
sh.disableBalancing(\"monitoringData.metricData\")
sh.shardCollection( \"monitoringData.torqueData\", { jobid : \"hashed\" }, false, { numInitialChunks: ${INIT_NUM_CHUNKS} } )
sh.disableBalancing(\"monitoringData.torqueData\")
monitoringData = db.getSiblingDB(\"monitoringData\")
monitoringData.metricData.createIndex({CompId: 1})
monitoringData.metricData.createIndex({\"#Time\": 1})
monitoringData.torqueData.createIndex({dateTime: 1})
sh.disableAutoSplit()
config = db.getSiblingDB(\"config\")
config.settings.save( { _id:\"chunksize\", value: 1024 } )"
SRC_DIR="/mnt/a/u/staff/saxton/Development/hpcmongodb" # "$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
else
source ${MONGOCONFIGPATH}
fi
export PROG_BOOT_LOCK_FILE=${SRC_DIR}/cluster_run/prog_boot_lock.lock
touch ${PROG_BOOT_LOCK_FILE}
cd ${SRC_DIR}/cluster_run
touch ${PROG_BOOT_LOCK_FILE}
./run.sh &
RUN_PID=$!
echo "sleep waitintg for cluster to become available"
while [ -f ${PROG_BOOT_LOCK_FILE} ]
do
sleep 2
done
if [ -f $MONGO_BASE_DIR/mongo_force_shutdown.sem ]
then
echo "Cluster Failed Startup, exiting"
exit 246
else
echo "Cluster is alive!"
fi
cd /mnt/a/u/staff/saxton/Development/mongobwmonitoringmetrics
echo "in dir $(pwd)"
cd pyhpcmongodb/ingest
module load bwpy
module load bwpy-mpi
H=$( cat ${MONGO_BASE_DIR}/router_svrs_and_ports.txt )
aprun -n 20 -N 2 -- python torque.py $H --dateRange ${START_DATE} ${END_DATE}
aprun -n 256 -N 4 -- python run.py $H --dateRange ${START_DATE} ${END_DATE} --metric_name ${MONGO_BASE_DIR}/ingest_${PBS_JOBID}
#sleep $(($PBS_WALLTIME-60*60))
kill -SIGUSR1 ${RUN_PID}
wait
#!/bin/bash
#PBS -l walltime=6:00:00
#PBS -N run_and_wait
#PBS -l nodes=200:ppn=32:xe
#PBS -l flags=commtransparent
##PBS -l advres=saxton
module load bwpy
module load bwpy-mpi
echo "MONGOCONFIGPATH
${MONGOCONFIGPATH}"
if [ -z "${MONGOCONFIGPATH}" ]
then
CHUNK_FACTOR=5
export CONFIG_SVRS_NUMBER_OF_INSTANCES=7
export ROUTER_SVRS_NUMBER_OF_INSTANCES=256
export ROUTER_SVRS_CONCURRENCY_PER_NODE=4
export SHARD_SVRS_NUMBER_OF_INSTANCES=256
export SHARD_SVRS_CONCURRENCY_PER_NODE=2
#total 325 nodes
INIT_NUM_CHUNKS=$(( ${CHUNK_FACTOR} * ${SHARD_SVRS_NUMBER_OF_INSTANCES} ))
export MONGO_BIN=/opt/mongodb/4.0.5/bin
export MONGO_BASE_DIR=/u/staff/saxton/scratch/hpcmongodb/metric_store_256_shards_20180101_20180201
#export MONGO_BASE_DIR=/projects/monitoring_data/hpcmongo_monitoringData/bulk_store_${SHARD_SVRS_NUMBER_OF_INSTANCES}_shards_ramDisk
export MONGO_TMP=${MONGO_BASE_DIR}
export USE_MEMORY_AS_DISK=true
export READ_ONLY=true
export INIT_EVAL_STR="sh.enableSharding(\"monitoringData\")
sh.shardCollection( \"monitoringData.metricData\", { k_to_h : \"hashed\" }, false, { numInitialChunks: ${INIT_NUM_CHUNKS} } )
sh.disableBalancing(\"monitoringData.metricData\")
sh.shardCollection( \"monitoringData.torqueData\", { jobid : \"hashed\" }, false, { numInitialChunks: ${INIT_NUM_CHUNKS} } )
sh.disableBalancing(\"monitoringData.torqueData\")
monitoringData = db.getSiblingDB(\"monitoringData\")
monitoringData.metricData.createIndex({CompId: 1})
monitoringData.metricData.createIndex({\"#Time\": 1})
monitoringData.torqueData.createIndex({dateTime: 1})
sh.disableAutoSplit()
config = db.getSiblingDB(\"config\")
config.settings.save( { _id:\"chunksize\", value: 2048 } )"
SRC_DIR="/mnt/a/u/staff/saxton/Development/hpcmongodb" # "$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
else
source ${MONGOCONFIGPATH}
fi
export PROG_BOOT_LOCK_FILE=${SRC_DIR}/cluster_run/prog_boot_lock.lock
touch ${PROG_BOOT_LOCK_FILE}
cd ${SRC_DIR}/cluster_run
touch ${PROG_BOOT_LOCK_FILE}
./run.sh &
RUN_PID=$!
echo "sleep waitintg for cluster to become available"
while [ -f ${PROG_BOOT_LOCK_FILE} ]
do
sleep 2
done
echo "cluster is alive!"
cd ${SRC_DIR}
sleep $(($PBS_WALLTIME-60*60))
kill -SIGUSR1 ${RUN_PID}
wait
#!/bin/bash -e
module load bwpy
module load bwpy-mpi
HPCMONGODB_SRC=${HOME}/Development/hpcmongodb
MONGOBWMONITORINGMETRICS_SRC=${HOME}/Development/mongobwmonitoringmetrics
source $HPCMONGODB_SRC/job_tools/run_multiple_clusters.sh
source $MONGOBWMONITORINGMETRICS_SRC/mongobwmonitoringmetrics-virenv/bin/activate
launch_clusters_and_ingest() {
LIST_OF_CONF_FILES=$@
launch_clusters $LIST_OF_CONF_FILES
export RUN_PID_LIST
export STARTER_HOSTS
echo "RUN_PID_LIST ${RUN_PID_LIST}"
echo "STARTER_HOSTS $STARTER_HOSTS"
for CONF_FILE in ${EX_LIST_OF_CONF_FILES}; do
echo ${CONF_FILE}
source $CONF_FILE
aprun -n ${NUM_PE_TORQUE:-32} -N 4 -- python -m pyhpcmongodb.ingest.torque --dateRange $START_DATE $END_DATE $STARTER_HOSTS > ${MONGO_TMP}/logs/torque_${START_DATE}_${END_DATE}.log 2>&1 &
INGEST_PID=$!
INGEST_PID_LIST="$INGEST_PID $INGEST_PID_LIST"
done
for p in $INGEST_PID_LIST; do
echo "Waiting on torque PID $p"
wait $p
done
INGEST_PID_LIST=""
for CONF_FILE in ${EX_LIST_OF_CONF_FILES}; do
echo ${CONF_FILE}
source $CONF_FILE
aprun -n ${NUM_PE_METRIC:-32} -N 4 -- python -m pyhpcmongodb.ingest.run --dateRange $START_DATE $END_DATE $STARTER_HOSTS > ${MONGO_TMP}/logs/metric_${START_DATE}_${END_DATE}.log 2>&1 &
INGEST_PID=$!
INGEST_PID_LIST="$INGEST_PID $INGEST_PID_LIST"
done
for p in $INGEST_PID_LIST; do
echo "Waiting on ingest PID $p"
wait $p
done
for p in $RUN_PID_LIST; do
echo "Killing run.sh PID $p"
kill $p
done
for p in $RUN_PID_LIST; do
echo "Waiting on run.sh PID $p"
wait $p
done
}
CHUNK_FACTOR=5
START_DATE="20180101"
END_DATE="20180102"
export CONFIG_SVRS_NUMBER_OF_INSTANCES=2
export ROUTER_SVRS_NUMBER_OF_INSTANCES=8
export ROUTER_SVRS_CONCURRENCY_PER_NODE=4
export SHARD_SVRS_NUMBER_OF_INSTANCES=8
export SHARD_SVRS_CONCURRENCY_PER_NODE=2
INIT_NUM_CHUNKS=$(( ${CHUNK_FACTOR} * ${SHARD_SVRS_NUMBER_OF_INSTANCES} ))
export MONGO_BIN=/opt/mongodb/4.0.5/bin
export MONGO_BASE_DIR=/u/staff/saxton/scratch/hpcmongodb/test/metric_store_${SHARD_SVRS_NUMBER_OF_INSTANCES}_shards_${START_DATE}_${END_DATE}
export MONGO_TMP=${MONGO_BASE_DIR}
export USE_MEMORY_AS_DISK=true
export READ_ONLY=false #true
export INIT_EVAL_STR="sh.enableSharding(\"monitoringData\")
sh.shardCollection( \"monitoringData.metricData\", { k_to_h : \"hashed\" }, false, { numInitialChunks: ${INIT_NUM_CHUNKS} } )
sh.disableBalancing(\"monitoringData.metricData\")
sh.shardCollection( \"monitoringData.torqueData\", { jobid : \"hashed\" }, false, { numInitialChunks: ${INIT_NUM_CHUNKS} } )
sh.disableBalancing(\"monitoringData.torqueData\")
monitoringData = db.getSiblingDB(\"monitoringData\")
monitoringData.metricData.createIndex({CompId: 1})
monitoringData.metricData.createIndex({\"#Time\": 1})
monitoringData.torqueData.createIndex({dateTime: 1})
sh.disableAutoSplit()
config = db.getSiblingDB(\"config\")
config.settings.save( { _id:\"chunksize\", value: 2048 } )"
SRC_DIR="/mnt/a/u/staff/saxton/Development/hpcmongodb" # "$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
CHUNK_FACTOR=5
START_DATE="20180201"
END_DATE="20180202"
export CONFIG_SVRS_NUMBER_OF_INSTANCES=2
export ROUTER_SVRS_NUMBER_OF_INSTANCES=8
export ROUTER_SVRS_CONCURRENCY_PER_NODE=4
export SHARD_SVRS_NUMBER_OF_INSTANCES=8
export SHARD_SVRS_CONCURRENCY_PER_NODE=2
INIT_NUM_CHUNKS=$(( ${CHUNK_FACTOR} * ${SHARD_SVRS_NUMBER_OF_INSTANCES} ))
export MONGO_BIN=/opt/mongodb/4.0.5/bin
export MONGO_BASE_DIR=/u/staff/saxton/scratch/hpcmongodb/test/metric_store_${SHARD_SVRS_NUMBER_OF_INSTANCES}_shards_${START_DATE}_${END_DATE}
export MONGO_TMP=${MONGO_BASE_DIR}
export USE_MEMORY_AS_DISK=true
export READ_ONLY=false #true
export INIT_EVAL_STR="sh.enableSharding(\"monitoringData\")
sh.shardCollection( \"monitoringData.metricData\", { k_to_h : \"hashed\" }, false, { numInitialChunks: ${INIT_NUM_CHUNKS} } )
sh.disableBalancing(\"monitoringData.metricData\")
sh.shardCollection( \"monitoringData.torqueData\", { jobid : \"hashed\" }, false, { numInitialChunks: ${INIT_NUM_CHUNKS} } )
sh.disableBalancing(\"monitoringData.torqueData\")
monitoringData = db.getSiblingDB(\"monitoringData\")
monitoringData.metricData.createIndex({CompId: 1})
monitoringData.metricData.createIndex({\"#Time\": 1})
monitoringData.torqueData.createIndex({dateTime: 1})
sh.disableAutoSplit()
config = db.getSiblingDB(\"config\")
config.settings.save( { _id:\"chunksize\", value: 2048 } )"
SRC_DIR="/mnt/a/u/staff/saxton/Development/hpcmongodb" # "$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
CHUNK_FACTOR=5
START_DATE="20180301"
END_DATE="20180302"
export CONFIG_SVRS_NUMBER_OF_INSTANCES=2
export ROUTER_SVRS_NUMBER_OF_INSTANCES=8
export ROUTER_SVRS_CONCURRENCY_PER_NODE=4
export SHARD_SVRS_NUMBER_OF_INSTANCES=8
export SHARD_SVRS_CONCURRENCY_PER_NODE=2
INIT_NUM_CHUNKS=$(( ${CHUNK_FACTOR} * ${SHARD_SVRS_NUMBER_OF_INSTANCES} ))
export MONGO_BIN=/opt/mongodb/4.0.5/bin
export MONGO_BASE_DIR=/u/staff/saxton/scratch/hpcmongodb/test/metric_store_${SHARD_SVRS_NUMBER_OF_INSTANCES}_shards_${START_DATE}_${END_DATE}
export MONGO_TMP=${MONGO_BASE_DIR}
export USE_MEMORY_AS_DISK=true
export READ_ONLY=false #true
export INIT_EVAL_STR="sh.enableSharding(\"monitoringData\")
sh.shardCollection( \"monitoringData.metricData\", { k_to_h : \"hashed\" }, false, { numInitialChunks: ${INIT_NUM_CHUNKS} } )
sh.disableBalancing(\"monitoringData.metricData\")
sh.shardCollection( \"monitoringData.torqueData\", { jobid : \"hashed\" }, false, { numInitialChunks: ${INIT_NUM_CHUNKS} } )
sh.disableBalancing(\"monitoringData.torqueData\")
monitoringData = db.getSiblingDB(\"monitoringData\")
monitoringData.metricData.createIndex({CompId: 1})
monitoringData.metricData.createIndex({\"#Time\": 1})
monitoringData.torqueData.createIndex({dateTime: 1})
sh.disableAutoSplit()
config = db.getSiblingDB(\"config\")
config.settings.save( { _id:\"chunksize\", value: 2048 } )"
SRC_DIR="/mnt/a/u/staff/saxton/Development/hpcmongodb" # "$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
#!/bin/bash -e
#PBS -l walltime=6:00:00
#PBS -N test_multiple_clusters_ingest
#PBS -l nodes=48:ppn=32:xe
#PBS -l flags=commtransparent
##PBS -l advres=saxton
export NUM_PE_TORQUE=32
export NUM_PE_METRIC=32
source run_multiple_clusters_ingest.sh
EX_LIST_OF_CONF_FILES="test.mongo_sitevar_201801.sh
test.mongo_sitevar_201802.sh
test.mongo_sitevar_201803.sh"
launch_clusters_and_ingest $EX_LIST_OF_CONF_FILES
#!/bin/bash -e
MONGOBWMONITORINGMETRICS_SRC=${HOME}/Development/mongobwmonitoringmetrics
export NUM_PE_TORQUE=32
export NUM_PE_METRIC=256
source ${MONGOBWMONITORINGMETRICS_SRC}/job_scripts/run_multiple_clusters_ingest.sh
EX_LIST_OF_CONF_FILES="mongo_sitevar_201701.sh
mongo_sitevar_201702.sh
mongo_sitevar_201703.sh"
for F in $EX_LIST_OF_CONF_FILES; do
_EX_LIST_OF_CONF_FILES="$MONGOBWMONITORINGMETRICS_SRC/site_envs/$F $_EX_LIST_OF_CONF_FILES"
done
EX_LIST_OF_CONF_FILES=$_EX_LIST_OF_CONF_FILES
HPCMONGODB_SRC=${HOME}/Development/hpcmongodb
source $HPCMONGODB_SRC/job_tools/run_multiple_clusters.sh
#EX_LIST_OF_CONF_FILES="test.mongo_sitevar_201801.sh
#test.mongo_sitevar_201802.sh
#test.mongo_sitevar_201803.sh"
launch_clusters $EX_LIST_OF_CONF_FILES
echo "RUN_PID_LIST $RUN_PID_LIST"
echo "STARTER_HOSTS $STARTER_HOSTS"
import argparse
first_x_sec = 1800
last_x_sec = 1800
if __name__== "__main__":
parser = argparse.ArgumentParser(description='ingest config args')
parser.add_argument('file_name', type=str,
help='generate metric report')
args = parser.parse_args()
with open(args.file_name) as f:
d_raw = [l for l in f]
field_type=[int, float, int, float, lambda x: str(x).strip()]
dat = [
{
kv.split(':')[0].strip(): t(kv.split(':')[1])
for kv,t in zip(d.split(','), field_type)
}
for d in d_raw
]
start = min([l['c_time'] for l in dat])
end = max([l['c_time'] for l in dat])
tot_lines = sum([l['lines'] for l in dat])
tot_line_sec = tot_lines/(end - start)
print('tot_line_sec: {}'.format(tot_line_sec))
end_limit = start+first_x_sec
tot_lines = sum([l['lines'] for l in dat if l['c_time'] < end_limit])
tot_time = max([l['c_time'] for l in dat if l['c_time'] < end_limit])
tot_line_sec = tot_lines/(tot_time - start)
print('first_x_sec: {}, tot_line_sec: {}'.format(first_x_sec, tot_line_sec))
start_limit = end-last_x_sec
tot_lines = sum([l['lines'] for l in dat if l['c_time'] > start_limit])
tot_time = min([l['c_time'] for l in dat if l['c_time'] > start_limit])
tot_line_sec = tot_lines/(end - tot_time)
print('last_x_sec: {}, tot_line_sec: {}'.format(last_x_sec, tot_line_sec))
#date --date '01/01/2018 03:00:00' +"%s" #to get epoch time
try:
from mpi4py import MPI
import mpi4py
except ImportError:
has_mpi = False
else:
has_mpi = True
comm = MPI.COMM_WORLD
nprocs = comm.Get_size()
if nprocs == 1:
has_mpi = False
if not has_mpi:
print('No MPI')
import math
from pymongo import MongoClient
from pymongo.errors import BulkWriteError
from pyhpcmongodb.query.get import get_constelation_hosts
from datetime import datetime as dt
import datetime
import numpy as np
import argparse
import os
import time
import random
import os
import re
from os.path import join as ospJoin
monitoring_data_path = "/projects/monitoring_data/ovis"
#monitoring_data_path = "/mnt/a/u/staff/saxton/Development/hpcmongodb/ingest/moc_data"
f_name_header = 'HEADER.20160115-'
database_name = 'monitoringData'
#metric_file_name = "metric_full_job_ir{ir}_rh{rh}.txt"
metric_file_name_template = "{name}_ir{ir}_rh{rh}.txt"
metric_report = "rank: {rank}, c_time: {c_time}, lines: {lines}, delta_t: {delta_t}, file: {file}\n"
DOC_BLOCK_SIZE=128
DOC_BLOCK_PROC_SIZE=1000000*1500 #1500 is average line length
DOC_BLOCK_PROC_SIZE=100000*1500 #1500 is average line length
DOC_BLOCK_PROC_SIZE=20000*1500 #1500 is average line length
TEST=False
if has_mpi:
print("has_mpi, nprocs {}, rank {}".format(nprocs, comm.rank))
print_move_line = nprocs - comm.rank + 1
status_str = ''.join(["\033[F"]*print_move_line + [str(comm.rank)] + [' {}>line_count {}, %{}, lines/sec {:.3f}, file {}, offset {}, blkErrCount {}'] + ['\n']*print_move_line + ['\r'])
found_eof_str = ''.join(["\033[F"]*print_move_line + [str(comm.rank)] + [' {}>Found EOF, file {}, offset {}'] + ['\n']*print_move_line + ['\r'])
found_eof_wait_str = ''.join(["\033[F"]*print_move_line + [str(comm.rank)] + [' {}>Found EOF, waiting, file {}, offset {}'] + ['\n']*print_move_line + ['\r'])
at_barr_str = ''.join(["\033[F"]*print_move_line + [str(comm.rank)] + ['>At Barrier'] + ['\n']*print_move_line + ['\r'])
no_top_status_str = ' {}>line_count {}, %{}, lines/sec {:.3f}, file {}, offset {}, blkErrCount {}'
no_top_found_eof_str = ' {}>Found EOF, file {}, offset {}'
no_top_found_eof_wait_str = ' {}>Found EOF, waiting, file {}, offset {}'
no_top_at_barr_str = ''.join([str(comm.rank)] + ['>At Barrier'])
else:
status_str = '>line_count {}, lines/sec {:.3f}, file {}, offset {}, blkErrCount {}\r'