Skip to content

Commit

Permalink
logger, display output
Browse files Browse the repository at this point in the history
  • Loading branch information
ranjithkumar007 committed Apr 16, 2019
1 parent d0573a0 commit db400ba
Show file tree
Hide file tree
Showing 9 changed files with 295 additions and 102 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ test.py
my_jobs/
test/
logs/
main_log_data.txt
82 changes: 76 additions & 6 deletions flamingo/core/matchmaker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,92 @@
import sys
from .messages.message import Message
from .messages.utils import send_msg, get_resources
from .jobs.utils import calculate_job_priority

MAX_RUNNING_JOBS_PER_NODE = 5
COEFF_MIGRATION_COST = 5
COEFF_CPU_USAGE = 5
COEFF_AVG_PROC_LOAD = 5
COEFF_FREE_MEM = 2000

def signal_handler(sig, frame):
if sig == signal.SIGUSR1:
pass

def calc_matching_score(resources, num_running_jobs, is_source):
total_cost = (1 - is_source) * COEFF_MIGRATION_COST + resources['cpu_usage'] * COEFF_CPU_USAGE \
+ resources['process_load'] * COEFF_AVG_PROC_LOAD + \
(1.0 / (resources['memory'] - mem)) * COEFF_FREE_MEM


return total_cost

# decrement resources
def match(job, resources, running_jobs):
# greedy approach
def match(job, resources, running_jobs, lost_resources):
mem = job.attr['max_memory']
cores = job.attr['cores']
source_ip = job.source_ip

assigned_ip = None
preempt_job_id = None

my_job_p = calculate_job_priority(job)

preempt_job = {}

candidate_ips = []

get_mem = lambda ip : resources[ip]['memory'] - lost_resources[ip]['memory']
get_cores = lambda ip: resources[ip]['cores'] - lost_resources[ip]['cores']

for ip in resources.keys():
if resources[ip]['memory'] >= mem and resources[ip]['cores'] >= cores:
assigned_ip = ip
return assigned_ip, preempt_job_id
if get_mem(ip) >= mem and get_cores(ip) >= cores and len(running_jobs[ip]) < MAX_RUNNING_JOBS_PER_NODE:
candidate_ips.append(ip)

for ip in resources.keys():
min_job = None
min_job_p = my_job_p

for job in running_jobs[ip]:
job_p = calculate_job_priority(job)

if job_p < min_job_p and (get_mem(ip) + job.attr['max_memory']) >= mem and \
(get_cores(ip) + job.attr['cores']) >= cores:
min_job_p = job_p
min_job = job

if min_job:
candidate_ips.append(ip)
preempt_job[ip] = min_job


if len(candidate_ips) > 0:
best_score = 999999999
for ip in candidate_ips:
temp = resources[ip]
temp['memory'] = get_mem(ip)
temp['cores'] = get_cores(ip)

num_running_jobs = len(running_jobs[ip])

if preempt_job != {}:
temp['memory'] += preempt_job[ip].attr['max_memory']
temp['cores'] += preempt_job[ip].attr['cores']
num_running_jobs -=1

cur_score = calc_matching_score(temp, len(running_jobs[ip]), ip == source_ip)

if cur_score < best_score:
best_score = cur_score
assigned_ip = ip

if assigned_ip:
lost_resources[assigned_ip]['memory'] += mem
lost_resources[assigned_ip]['cores'] += cores

if preempt_job_id:
lost_resources[assigned_ip]['memory'] += preempt_job[assigned_ip].attr['max_memory']
lost_resources[assigned_ip]['cores'] += preempt_job[assigned_ip].attr['cores']

return assigned_ip, preempt_job_id

Expand All @@ -32,7 +102,7 @@ def matchmaking(my_node):

while not my_node.leader_jobPQ.empty():
job = my_node.leader_jobPQ.get()
assigned_ip, preempt_job_id = match(job, my_node.resources, my_node.running_jobs)
assigned_ip, preempt_job_id = match(job, my_node.resources, my_node.running_jobs, my_node.lost_resources)

if assigned_ip and not preempt_job_id:
msg = Message('EXEC_JOB',content=job)
Expand Down
136 changes: 51 additions & 85 deletions flamingo/core/messages/handlers.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,13 @@
from .message import Message
from .utils import send_msg, get_resources
from .utils import send_msg, get_job_status, start_job, \
get_random_alive_node, send_file, exec_new_job, add_log
import time
from .utils import send_file
import os
import signal
import random
from . import params
from multiprocessing import Process

def start_job(my_node, job_id, recv_ip):
print("Starting job")
cmd = "./executable < input > " + "../../" + params.LOG_DIR + "/" + job_id
print(cmd)

exec_p = Process(target = exec_new_job, args = (my_node, job_id, cmd, recv_ip))
exec_p.start()
my_node.job_pid[job_id] = exec_p.pid
print("job pid added")
print(my_node.job_pid)


def get_random_alive_node(my_node, not_ip = None):
while 1:
ip = random.choice(my_node.resources.keys())
if ip != not_ip:
return ip

def exec_job_handler(my_node, job):
inp_fp = job.attr['input_path']
Expand Down Expand Up @@ -61,36 +44,6 @@ def query_files_handler(my_node, recv_ip, content):
send_file(content[1], to = recv_ip, job_id = job_id, file_ty = "input")
send_file(content[2], to = recv_ip, job_id = job_id, file_ty = "executable")

def exec_new_job(my_node, job_id, cmd, source_ip):
os.chdir(os.path.join(params.EXEC_DIR, job_id))
st_tm = time.time()
os.system(cmd)
end_tm = time.time()

job_run_time = end_tm - st_tm
tat = end_tm - my_node.job_submitted_time[job_id]

print("Completed job")
msg = Message('COMPLETED_JOB', content = [job_id, job_run_time, tat])
send_msg(msg, to = my_node.root_ip)

print(my_node.job_pid)
del my_node.job_pid[job_id]
os.system("rm -rf " + os.path.join(params.EXEC_DIR, job_id))

# After running got completed remove this job from individual_running_jobs
del my_node.individual_running_jobs[job_id]


log_ip = source_ip
if source_ip == my_node.self_ip:
msg = Message('GET_ALIVE_NODE', content = [source_ip, job_id])
send_msg(msg, to = my_node.root_ip)
else:
send_file("../../" + os.path.join(params.LOG_DIR, job_id), to = log_ip, job_id = job_id, file_ty = "log")

# send leader msg to remove this job from running Q

def get_alive_node_handler(my_node, recv_ip, content):
not_ip, job_id = content

Expand All @@ -114,10 +67,7 @@ def log_file_handler(my_node, content):
send_msg(msg, to = my_node.root_ip)

def completed_job_handler(my_node, recv_ip, content):
# print(my_node.completed_jobs)
# print(my_node.running_jobs)
job_id, job_run_time, tat = content
# print(my_node.running_jobs)
j = None
for i in range(len(my_node.running_jobs[recv_ip])):
if my_node.running_jobs[recv_ip][i].job_id == job_id:
Expand All @@ -132,14 +82,12 @@ def completed_job_handler(my_node, recv_ip, content):
my_node.completed_jobs[job_id]['turn_around_time'] = tat
my_node.completed_jobs[job_id]['job_run_time'] = job_run_time
my_node.completed_jobs[job_id]['log_file_ip1'] = recv_ip
# print(my_node.completed_jobs)
# print(my_node.running_jobs)


def preempt_and_exec_handler(my_node, to, content):
preempt_pid = my_node.job_pid[content[1]]

os.kill(preempt_pid, signal.SIGKILL)
print("Preempted this job with id : %s in node %s" % (content[1],my_node.self_ip))
add_log(my_node, "Preempted this job with id : %s in node %s" % (content[1],my_node.self_ip), "INFO")
msg = Message('PREEMPTED_JOB',content = [my_node.individual_running_jobs[content[1]]])
send_msg(msg, to = to)
del my_node.individual_running_jobs[content[1]]
Expand All @@ -153,21 +101,13 @@ def preempted_job_handler(my_node, recv_addr, content):

def status_job_handler(my_node, recv_addr, content):
jobid = content[0]
reply = "Waiting"
if jobid in my_node.completed_jobs.keys():
reply = "Completed"

# print(my_node.running_jobs)
for key in my_node.running_jobs.keys():
for job in my_node.running_jobs[key]:
if jobid == job.job_id:
reply = "Running"
break

reply = get_job_status(my_node)

msg = Message('STATUS_REPLY',content = [jobid, reply])
send_msg(msg, to = recv_addr)

def print_status_reply(my_node, content):
def status_reply_handler(my_node, content):
jobid = content[0]
reply = content[1]
print("Status of job with jobid %s is %s" % (jobid,reply))
Expand Down Expand Up @@ -208,7 +148,7 @@ def backup_elect_handler(my_node):
send_msg(msg, to = my_node.backup_ip)

def le_result_handler(my_node):
print(my_node.self_ip, " is the leader")
add_log(my_node, my_node.self_ip + " is the leader", "INFO")
my_node.le_elected = True
my_node.root_ip_dict['ip'] = my_node.self_ip

Expand All @@ -226,31 +166,57 @@ def heartbeat_ack_handler(my_node):

my_node.last_jobs_sent = 0

def send_heartbeat(my_node, to):
cur_res = get_resources()
my_node.resources[my_node.self_ip] = cur_res
def display_output_handler(my_node, recv_ip, content):
job_id = content
job_status = get_job_status(my_node, job_id)

jobQ_cp = []
for job_i in my_node.jobQ:
jobQ_cp.append(job_i)
if job_status != "Complete":
msg = Message('DISPLAY_OUTPUT_ACK',content = [jobid, job_status])
send_msg(msg, to = recv_addr)
return

msg = Message('HEARTBEAT', content = [jobQ_cp, cur_res])
print(msg.content[0])
print(msg.content[1])
to_addr = None
req_ip = None

if ((my_node.completed_jobs[job_id]['log_file_ip1'] == recv_ip or \
my_node.completed_jobs[job_id]['log_file_ip2'] == recv_ip) and \
recv_ip in my_node.resources.keys()):
req_ip = recv_ip
to_addr = recv_ip
elif my_node.completed_jobs[job_id]['log_file_ip1'] in my_node.resources.keys():
to_addr = my_node.completed_jobs[job_id]['log_file_ip1']
elif my_node.completed_jobs[job_id]['log_file_ip2'] in my_node.resources.keys():
to_addr = my_node.completed_jobs[job_id]['log_file_ip2']
else:
print("Exception!! Flamingo supports only 1 fault. 2 faults detected")

my_node.last_jobs_sent = len(msg.content[0])
send_msg(msg, to)
msg = Message('FWD_DISPLAY_OUTPUT', content = [req_ip, job_id])
send_msg(msg, to = recv_ip)

def fwd_display_output_handler(my_node, content):
source_ip, job_id = content

send_file(os.path.join(params.LOG_DIR, job_id), to = source_ip, job_id = job_id, file_ty = "fwd_display_output_ack")

def display_output_ack_handler(my_node, content):
print("Job id : %s status : %s; Output can be displayed only after it completes" % (content[0], content[1]))
os.kill(my_node.submit_interface_pid, signal.SIGUSR1)

def fwd_display_output_ack_handler(my_node, content):
job_id, file_ty, file_content = content

print(file_content)
os.kill(my_node.submit_interface_pid, signal.SIGUSR1)

# see for backup

def sleep_and_ping(to):
time.sleep(params.HEARTBEAT_INTERVAL)
msg = Message('ARE_YOU_ALIVE')
send_msg(msg, to)

# both task and resource manager combined
def heartbeat_handler(my_node, recv_ip, content, manager):
# call matchmaker
node_jobQ, node_res = content
my_node.resources[recv_ip] = node_res
my_node.lost_resources[recv_ip] = {'memory' : 0, 'cores' : 0}
my_node.last_heartbeat_ts[recv_ip] = time.time()

for job_i in node_jobQ:
Expand All @@ -260,8 +226,8 @@ def heartbeat_handler(my_node, recv_ip, content, manager):
my_node.running_jobs[recv_ip] = manager.list()

# wake up matchmaker
os.system("pgrep -P " + str(os.getpid()))
print(my_node.matchmaker_pid)
# os.system("pgrep -P " + str(os.getpid()))
# print(my_node.matchmaker_pid)
os.kill(my_node.matchmaker_pid, signal.SIGUSR1)

msg = Message('HEARTBEAT_ACK')
Expand Down
Loading

0 comments on commit db400ba

Please sign in to comment.