Skip to content

Commit

Permalink
5.4.2, improve performance.
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael-X-Net committed Jul 17, 2023
1 parent 5b46995 commit fedb4ed
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 25 deletions.
37 changes: 32 additions & 5 deletions code/default/lib/noarch/front_base/http2_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ def __init__(self,
self.task = task
self.state = STATE_IDLE
self.get_head_time = None
self.start_connection_point = self.connection._sock.bytes_received
self.get_head_stream_num = 0

# There are two flow control windows: one for data we're sending,
# one for data being sent to us.
Expand Down Expand Up @@ -292,6 +294,7 @@ def receive_frame(self, frame):
self.response_header_datas = None

self.get_head_time = time.time()
self.get_head_stream_num = len(self.connection.streams)

length = self.response_headers.get("Content-Length", None)
if isinstance(length, list):
Expand All @@ -305,14 +308,38 @@ def receive_frame(self, frame):
if self.config.http2_show_debug:
self.logger.debug("%s Closing remote side of stream:%d", self.connection.ssl_sock.ip_str, self.stream_id)

xcost = self.response_headers.get("X-Cost", -1)
if isinstance(xcost, list):
xcost = float(xcost[0])
rcost = self.response_headers.get("R-Cost", -1)
if isinstance(rcost, list):
rcost = float(rcost[0])

time_now = time.time()
time_cost = time_now - self.get_head_time
if time_cost > 0 and \
isinstance(self.task.content_length, int) and \
not self.task.finished:
speed = self.task.content_length / time_cost
whole_cost = time_now - self.start_time
receive_cost = time_now - self.get_head_time
bytes_received = self.connection._sock.bytes_received - self.start_connection_point
if receive_cost > 0 and bytes_received > 10000 and not self.task.finished and receive_cost > 0.001:
# speed = bytes_received / receive_cost
speed = (len(self.request_body) + bytes_received) / (whole_cost - xcost)
self.connection.update_speed(speed)
self.task.set_state("h2_finish[SP:%d]" % speed)

send_cost = len(self.request_body) / speed
streams_cost = ((self.connection.max_payload /2) * self.get_head_stream_num) / speed

if xcost >= 0:
rtt = whole_cost - xcost - send_cost - receive_cost # - streams_cost
if self.config.http2_show_debug:
self.logger.debug("%s RTT:%f rtt:%f send_len:%d recv_len:%d "
"whole_Cost:%f xcost:%f rcost:%f send_cost:%f recv_cost:%f "
"streams_cost:%f Speed: %f",
self.connection.ssl_sock.ip_str,
self.connection.rtt * 1000, rtt * 1000,
len(self.request_body), bytes_received,
whole_cost, xcost, rcost, send_cost, receive_cost, streams_cost, speed)
self.connection.update_rtt(rtt)

self._close_remote()

self.close("end stream")
Expand Down
50 changes: 37 additions & 13 deletions code/default/lib/noarch/front_base/http_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,13 +179,15 @@ def finish(self):


class HttpWorker(object):
max_payload = 128 * 1024

def __init__(self, logger, ip_manager, config, ssl_sock, close_cb, retry_task_cb, idle_cb, log_debug_data):
self.logger = logger
self.ip_manager = ip_manager
self.config = config
self.ssl_sock = ssl_sock
self.rtt = ssl_sock.handshake_time
self.speed = 200
self.rtt = ssl_sock.handshake_time * 0.001
self.speed = 5000000
self.ip_str = ssl_sock.ip_str
self.close_cb = close_cb
self.retry_task_cb = retry_task_cb
Expand All @@ -195,7 +197,8 @@ def __init__(self, logger, ip_manager, config, ssl_sock, close_cb, retry_task_cb
self.keep_running = True
self.processed_tasks = 0
self.continue_fail_tasks = 0
self.speed_history = []
self.rtt_history = [self.rtt,]
self.speed_history = [self.speed, self.speed, self.speed]
self.last_recv_time = self.ssl_sock.create_time
self.last_send_time = self.ssl_sock.create_time
self.life_end_time = self.ssl_sock.create_time + \
Expand All @@ -207,6 +210,7 @@ def __str__(self):
o += " running: %s\r\n" % (self.keep_running)
o += " processed_tasks: %d\r\n" % (self.processed_tasks)
o += " continue_fail_tasks: %s\r\n" % (self.continue_fail_tasks)
o += " rtt_history: %s\r\n" % (self.rtt_history)
o += " speed_history: %s\r\n" % (self.speed_history)
if self.version != "1.1":
o += "streams: %d\r\n" % len(self.streams)
Expand All @@ -215,13 +219,26 @@ def __str__(self):
o += " score: %f\r\n" % (self.get_score())
return o

def update_rtt(self, rtt):
self.rtt_history.append(rtt)
if len(self.rtt_history) > 10:
self.rtt_history.pop(0)
# self.rtt = sum(self.rtt_history) / len(self.rtt_history)

def update_speed(self, speed):
self.speed_history.append(speed)
if len(self.speed_history) > 10:
self.speed_history.pop(0)
self.speed = sum(self.speed_history) / len(self.speed_history)

def update_debug_data(self, rtt, sent, received, speed):
self.rtt = rtt
if sent + received > 10000:
self.speed_history.append(speed)
if len(self.speed_history) > 10:
self.speed_history.pop(0)
self.speed = sum(self.speed_history) / len(self.speed_history)
# if sent + received > 10000:
# self.speed_history.append(speed)
# if len(self.speed_history) > 10:
# self.speed_history.pop(0)
# self.speed = sum(self.speed_history) / len(self.speed_history)
# else:
# self.rtt = rtt

self.log_debug_data(rtt, sent, received)

Expand All @@ -243,13 +260,20 @@ def close(self, reason):

def get_score(self):
# The smaller, the better
score = (50 - (self.speed/6.0)) + (self.rtt/20.0)

score = self.rtt
if self.version != "1.1":
score += len(self.streams) * 3
response_body_len = self.max_payload
for _, stream in self.streams.items():
if stream.response_body_len == 0:
response_body_len += self.max_payload
else:
response_body_len += stream.response_body_len - stream.task.body_len
score += response_body_len / self.speed

if self.config.show_state_debug:
self.logger.debug("get_score %s, speed:%d rtt:%d stream_num:%d score:%d", self.ip_str,
self.speed, self.rtt, len(self.streams), score)
self.logger.debug("get_score %s, speed:%f rtt:%d stream_num:%d score:%f", self.ip_str,
self.speed * 0.000001, self.rtt * 1000, len(self.streams), score)

return score

Expand Down
2 changes: 1 addition & 1 deletion code/default/lib/noarch/front_base/http_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def create_worker_thread(self):
self.connect_all_workers = True

try:
ssl_sock = self.connection_manager.get_ssl_connection(timeout=10)
ssl_sock = self.connection_manager.get_ssl_connection(timeout=60)
except Exception as e:
self._debug_log("create_worker_thread get_ssl_connection fail:%r", e)
ssl_sock = None
Expand Down
4 changes: 4 additions & 0 deletions code/default/lib/noarch/hyper/common/bufsocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ def __init__(self, sck, buffer_size=1000):
# The number of bytes in the buffer.
self._bytes_in_buffer = 0

# record all bytes received from beginning
self.bytes_received = 0

# following is define for send buffer
# all send will be cache and send when flush called,
# combine data to reduce the api call
Expand Down Expand Up @@ -247,6 +250,7 @@ def recv(self, amt):
if not count and amt > self._bytes_in_buffer:
raise ConnectionResetError()
self._bytes_in_buffer += count
self.bytes_received += count

# Read out the bytes and update the index.
amt = min(amt, self._bytes_in_buffer)
Expand Down
2 changes: 1 addition & 1 deletion code/default/version.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
5.4.0
5.4.2
4 changes: 4 additions & 0 deletions code/default/x_tunnel/local/direct_front.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ def request(method, host, schema="http", path="/", headers={}, data="", timeout=
return response.text, response.status, response


def start():
pass


def stop():
pass

Expand Down
7 changes: 4 additions & 3 deletions code/default/x_tunnel/local/proxy_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ def normal_round_trip_worker(self, work_id):

request_session_id = self.session_id
upload_data_head = struct.pack("<cBB8sIBIH", magic, g.protocol_version, pack_type,
bytes(self.session_id), transfer_no,
utils.to_bytes(self.session_id), transfer_no,
server_timeout, send_data_len, send_ack_len)
upload_post_buf = base_container.WriteBuffer(upload_data_head)
upload_post_buf.append(data)
Expand Down Expand Up @@ -727,12 +727,13 @@ def normal_round_trip_worker(self, work_id):
rtt = max(100, rtt)
speed = (send_data_len + len(content) + 400) / rtt
# speed = (send_data_len + len(content)) / (roundtrip_time - (time_cost / 1000.0))
rcost = response.headers.get(b"R-Cost", b"")
xlog.debug(
"worker:%d no:%d "
"cost_time:%f server_time:%f server_timeout:%d "
"cost_time:%f rcost:%s server_time:%f server_timeout:%d "
"snd:%d rcv:%d s_pool:%d on_road:%d target_worker:%d speed:%d",
work_id, transfer_no,
roundtrip_time, time_cost / 1000.0, server_timeout,
roundtrip_time, rcost, time_cost / 1000.0, server_timeout,
send_data_len, len(content), server_send_pool_size,
self.on_road_num,
self.target_on_roads,
Expand Down
3 changes: 2 additions & 1 deletion code/default/x_tunnel/local/tls_relay_front/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ def __init__(self, fn):
self.set_var("dispather_connect_all_workers_on_startup", 1)

# connect_manager
self.set_var("https_connection_pool_min", 1)
self.set_var("https_connection_pool_min", 0)
self.set_var("max_links_per_ip", 1)
self.set_var("https_connection_pool_max", 20)

# connect_creator
self.set_var("socket_timeout", 2)
self.set_var("connect_force_http2", 1)

# http 2 worker
Expand Down
3 changes: 2 additions & 1 deletion code/default/x_tunnel/local/tls_relay_front/ip_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def get_ip_sni_host(self):
ips = self.host_manager.ips

best_info = None
best_rtt = 2000
best_rtt = 99999

for ip in ips:
info = self._get_ip_info(ip)
Expand Down Expand Up @@ -72,6 +72,7 @@ def report_connect_fail(self, ip_str, sni=None, reason="", force_remove=False):
ip = utils.to_str(ip)
info = self._get_ip_info(ip)
info["fail_times"] += 1
info["rtt"] = 2000
self.logger.debug("ip %s connect fail:%s", ip, reason)

def ssl_closed(self, ip_str, sni=None, reason=""):
Expand Down

0 comments on commit fedb4ed

Please sign in to comment.