-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest—pythonGetInfo
120 lines (92 loc) · 3.5 KB
/
test—pythonGetInfo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# 配置文件 conf.py
# -*- coding: utf-8 -*-
class db_conf:
host = 'x.x.x.x' # 数据库连接的IP
port = 3306 # 数据库的端口号
user = 'root' # 用户名
passwd = 'xxxxx' # 密码
db = 'mysql' # 库名
class device_conf:
device_name = "x.x.x.x:5555"
ip = 'x.x.x.x'
# 查询过滤top内所需的信息 Devicesinfo.py
# -*- coding: utf-8 -*-
from LinuxInfo.conf import device_conf
from ppadb.client import Client as AdbClient
class DeviceInfoGet:
def __init__(self, device_name=device_conf.device_name):
self.device_name = device_name
def get_cpu_used(self, procName):
"""
查询进程占用的CPU资源
:param procName: 进程名称
:return:
"""
client = AdbClient(host="127.0.0.1", port=5037)
device = client.device(f"{self.device_name}")
# result = device.shell(f"top -n 1 | tail -n +5 ")
result = device.shell(f"top -n 1 | grep {procName} | grep -v grep | awk '{{print($9)}}'")
return result
if __name__ == '__main__':
test = DeviceInfoGet()
test.get_cpu_used("idea_video")
# 主程序,将查询到的数据存入数据库
# -*- coding: utf-8 -*-
import time, pymysql, datetime
from LinuxInfo.conf import db_conf
from LinuxInfo.DevicesInfo import DeviceInfoGet
db = pymysql.connect(
host=db_conf.host,
port=db_conf.port,
user=db_conf.user,
passwd=db_conf.passwd,
charset='utf8'
)
cur = db.cursor() # 生成游标对象
try:
cur.execute("create database device_info_db")
print("$ 创建新的数据库成功!!")
except Exception:
choise = input("$ 数据库 device_info_db 已经存在,是否删除重新创建? (Y/N)\n$ ->:")
if choise.lower() == "y":
cur.execute("drop database device_info_db")
cur.execute("create database device_info_db")
print("$ 删除旧数据库,并重新创建新的数据库(device_info_db)...")
time.sleep(1)
print("$ 创建新的数据库成功!!")
else:
print("$ 继续使用旧的数据库...")
cur.execute("use device_info_db")
try:
cur.execute("create table device_info_table(id int auto_increment primary key,name varchar(20),cpu_used varchar(30),time varchar(50))")
print("$ 创建新的table成功!!")
except Exception:
choise1 = input("table device_info_table 已经存在,是否删除重新创建? (Y/N)\n$ ->:")
if choise1.lower() == "y":
cur.execute("drop table device_info_table")
cur.execute("create table device_info_table(id int auto_increment primary key,name varchar(20),cpu_used varchar(30),time varchar(50))")
print("$ 删除旧table,并重新创建新的table(device_info_table)...")
time.sleep(1)
print("$ 创建新的table成功!!")
else:
print("$ 继续使用旧的 table...")
sql_in = "insert into device_info_table (name,cpu_used,time)values(%s,%s,%s,%s)"
start_time = time.time()
while True:
DeviceInfo = DeviceInfoGet()
cpu_info = DeviceInfo.get_cpu_used("idea_video")
now_time = datetime.datetime.now()
cur.execute(f"insert into device_info_table (name,cpu_used,time)values('video','{cpu_info}%','{now_time}')")
db.commit()
print("保存中....")
time.sleep(5)
end_time = time.time()
if end_time - start_time >= 30:
sel = input("已经抓取30s,是否继续抓取? (Y/N)\n$ ->:")
if sel.lower() == "y":
start_time = time.time()
continue
else:
break
db.close()
cur.close()