From 0c4a5995d6a6fb0ff3d2fd6ac2a0d41fa6ba01bc Mon Sep 17 00:00:00 2001 From: codeskyblue Date: Fri, 5 Mar 2021 13:34:41 +0800 Subject: [PATCH] enhance tidevice info, complete pair function --- README.md | 51 ++++++++++++++++ README_EN.md | 51 ++++++++++++++++ examples/read_properties.py | 36 ++++++++++++ requirements.txt | 2 + tidevice/__main__.py | 19 +++--- tidevice/_device.py | 113 +++++++++++++++++++++++++----------- tidevice/_ssl.py | 90 ++++++++++++++++++++++++++++ tidevice/_usbmux.py | 2 +- 8 files changed, 322 insertions(+), 42 deletions(-) create mode 100644 examples/read_properties.py create mode 100644 tidevice/_ssl.py diff --git a/README.md b/README.md index ec9af23..09d487d 100644 --- a/README.md +++ b/README.md @@ -175,6 +175,57 @@ $ tidevice developer [I 210127 11:37:53 _device:589] DeveloperImage mounted successfully ``` +# 查看设备信息 +```bash +$ tidevice info + +# 查看设备电源信息 +$ tidevice info --domain com.apple.mobile.battery --json +{ + "BatteryCurrentCapacity": 53, + "BatteryIsCharging": true, + "ExternalChargeCapable": true, + "ExternalConnected": true, + "FullyCharged": false, + "GasGaugeCapability": true, + "HasBattery": true +} +``` + +Known domains are: + +```text +com.apple.disk_usage +com.apple.disk_usage.factory +com.apple.mobile.battery +com.apple.iqagent +com.apple.purplebuddy +com.apple.PurpleBuddy +com.apple.mobile.chaperone +com.apple.mobile.third_party_termination +com.apple.mobile.lockdownd +com.apple.mobile.lockdown_cache +com.apple.xcode.developerdomain +com.apple.international +com.apple.mobile.data_sync +com.apple.mobile.tethered_sync +com.apple.mobile.mobile_application_usage +com.apple.mobile.backup +com.apple.mobile.nikita +com.apple.mobile.restriction +com.apple.mobile.user_preferences +com.apple.mobile.sync_data_class +com.apple.mobile.software_behavior +com.apple.mobile.iTunes.SQLMusicLibraryPostProcessCommands +com.apple.mobile.iTunes.accessories +com.apple.mobile.internal +com.apple.mobile.wireless_lockdown +com.apple.fairplay +com.apple.iTunes +com.apple.mobile.iTunes.store +com.apple.mobile.iTunes +``` + ### 其他常用 ```bash # 重启 diff --git a/README_EN.md b/README_EN.md index 876a69b..fad9cb3 100644 --- a/README_EN.md +++ b/README_EN.md @@ -165,6 +165,57 @@ $ tidevice developer [I 210127 11:37:53 _device:589] DeveloperImage mounted successfully ``` +### Check device info +```bash +$ tidevice info + +# check device power info +$ tidevice info --domain com.apple.mobile.battery --json +{ + "BatteryCurrentCapacity": 53, + "BatteryIsCharging": true, + "ExternalChargeCapable": true, + "ExternalConnected": true, + "FullyCharged": false, + "GasGaugeCapability": true, + "HasBattery": true +} +``` + +Known domains are: + +```text +com.apple.disk_usage +com.apple.disk_usage.factory +com.apple.mobile.battery +com.apple.iqagent +com.apple.purplebuddy +com.apple.PurpleBuddy +com.apple.mobile.chaperone +com.apple.mobile.third_party_termination +com.apple.mobile.lockdownd +com.apple.mobile.lockdown_cache +com.apple.xcode.developerdomain +com.apple.international +com.apple.mobile.data_sync +com.apple.mobile.tethered_sync +com.apple.mobile.mobile_application_usage +com.apple.mobile.backup +com.apple.mobile.nikita +com.apple.mobile.restriction +com.apple.mobile.user_preferences +com.apple.mobile.sync_data_class +com.apple.mobile.software_behavior +com.apple.mobile.iTunes.SQLMusicLibraryPostProcessCommands +com.apple.mobile.iTunes.accessories +com.apple.mobile.internal +com.apple.mobile.wireless_lockdown +com.apple.fairplay +com.apple.iTunes +com.apple.mobile.iTunes.store +com.apple.mobile.iTunes +``` + ### Other ```bash # reboot device diff --git a/examples/read_properties.py b/examples/read_properties.py new file mode 100644 index 0000000..59b41a6 --- /dev/null +++ b/examples/read_properties.py @@ -0,0 +1,36 @@ +# coding: utf-8 +# + +from tidevice._usbmux import Usbmux +from tidevice import Device +from pprint import pprint + +def main(): + u = Usbmux() + + # List devices + devices = u.device_list() + pprint(devices) + + buid = u.read_system_BUID() + print("BUID:", buid) + + d = Device() + dev_pkey = d.get_value("DevicePublicKey", no_session=True) + print("DevicePublicKey:", dev_pkey) + + wifi_address = d.get_value("WiFiAddress", no_session=True) + print("WiFi Address:", wifi_address) + + with d.create_inner_connection() as s: + ret = s.send_recv_packet({ + "Request": "GetValue", + "Label": "example", + }) + pprint(ret['Value']) + + # print("Values", values) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 9438233..d78edcf 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,6 +14,8 @@ Pillow requests colored packaging +pyOpenSSL #>=20.0.1 +pyasn1 #>=0.4.8 tornado simple_tornado diff --git a/tidevice/__main__.py b/tidevice/__main__.py index 1c1f4de..5816019 100644 --- a/tidevice/__main__.py +++ b/tidevice/__main__.py @@ -92,24 +92,24 @@ def cmd_list(args: argparse.Namespace): def cmd_device_info(args: argparse.Namespace): d = _udid2device(args.udid) - dinfo = d.device_info() + value = d.get_value(no_session=args.simple, key=args.key, domain=args.domain) if args.json: - def _bytes_hook(obj): if isinstance(obj, bytes): return base64.b64encode(obj).decode() - - print(json.dumps(dinfo, indent=4, default=_bytes_hook)) + print(json.dumps(value, indent=4, default=_bytes_hook)) + elif args.key or args.domain: + pprint(value) else: - print("{:17s} {}".format("ProductName:", - MODELS.get(dinfo['ProductType']))) + print("{:17s} {}".format("MarketName:", + MODELS.get(value['ProductType']))) for attr in ('DeviceName', 'ProductVersion', 'ProductType', 'ModelNumber', 'SerialNumber', 'PhoneNumber', 'CPUArchitecture', 'ProductName', 'ProtocolVersion', 'RegionInfo', 'TimeIntervalSince1970', 'TimeZone', 'UniqueDeviceID', 'WiFiAddress', 'BluetoothAddress', 'BasebandVersion'): - print("{:17s} {}".format(attr + ":", dinfo.get(attr))) + print("{:17s} {}".format(attr + ":", value.get(attr))) def cmd_version(args: argparse.Namespace): @@ -354,7 +354,10 @@ def cmd_test(args: argparse.Namespace): flags=[ dict(args=['--json'], action='store_true', - help="output as json format") + help="output as json format"), + dict(args=['-s', '--simple'], action='store_true', help='use a simple connection to avoid auto-pairing with the device'), + dict(args=['-k', '--key'], type=str, help='only query specified KEY'), + dict(args=['--domain'], help='set domain of query to NAME.'), ], help="show device info"), dict(action=cmd_system_info, diff --git a/tidevice/_device.py b/tidevice/_device.py index d2e1b15..a4b1aac 100644 --- a/tidevice/_device.py +++ b/tidevice/_device.py @@ -11,18 +11,18 @@ import os import pprint import re -import ssl -import sys import shutil +import ssl import subprocess +import sys import tempfile import threading import time import typing import uuid +import zipfile from collections import namedtuple from typing import Iterator, Optional, Tuple, Union -import zipfile import requests from cached_property import cached_property @@ -35,11 +35,12 @@ from ._instruments import (AUXMessageBuffer, DTXMessage, DTXService, Event, ServiceInstruments) from ._ipautil import IPAReader +from ._ssl import make_certs_and_key from ._proto import * from ._safe_socket import * from ._sync import Sync from ._usbmux import Usbmux -from ._utils import ProgressReader, get_app_dir, get_binary_by_name +from ._utils import ProgressReader, get_app_dir from .exceptions import * logger = setup_logger(PROGRAM_NAME, @@ -81,6 +82,7 @@ def __init__(self, self._usbmux = Usbmux(usbmux) elif isinstance(usbmux, Usbmux): self._usbmux = usbmux + self._udid = udid self._info = self.info self._lock = threading.Lock() @@ -119,6 +121,10 @@ def is_connected(self) -> bool: @property def udid(self) -> str: return self._udid + + @property + def devid(self) -> int: + return self._info['DeviceID'] @property def pair_record(self) -> dict: @@ -176,12 +182,49 @@ def pair(self): Same as idevicepair pair iconsole is a github project, hosted in https://github.com/anonymous5l/iConsole """ - iconsole_path = get_binary_by_name("iconsole") - if not os.path.isfile(iconsole_path): - raise MuxError("Unable to pair without iconsole") - output = subprocess.check_output([iconsole_path, 'afc', '-u', self.udid, 'space']).decode('utf-8') - if 'TotalSpace:' not in output: - raise MuxError("Pair: " + output) + device_public_key = self.get_value("DevicePublicKey", no_session=True) + if not device_public_key: + raise MuxError("Unable to retrieve DevicePublicKey") + buid = self._usbmux.read_system_BUID() + wifi_address = self.get_value("WiFiAddress", no_session=True) + + cert_pem, priv_key_pem, dev_cert_pem = make_certs_and_key(device_public_key) + pair_record = { + 'DevicePublicKey': device_public_key, + 'DeviceCertificate': dev_cert_pem, + 'HostCertificate': cert_pem, + 'HostID': str(uuid.uuid4()).upper(), + 'RootCertificate': cert_pem, + 'SystemBUID': buid, + } + + with self.create_inner_connection() as s: + ret = s.send_recv_packet({ + "Request": "Pair", + "PairRecord": pair_record, + "Label": PROGRAM_NAME, + "ProtocolVersion": "2", + "PairingOptions": { + "ExtendedPairingErrors": True, + } + }) + assert ret, "Pair request got empty response" + if "Error" in ret: + # error could be "PasswordProtected" or "PairingDialogResponsePending" + raise MuxError("pair:", ret['Error']) + + assert 'EscrowBag' in ret, ret + pair_record['HostPrivateKey'] = priv_key_pem + pair_record['EscrowBag'] = ret['EscrowBag'] + pair_record['WiFiMACAddress'] = wifi_address + + self.usbmux.send_recv({ + "MessageType": "SavePairRecord", + "PairRecordID": self.udid, + "PairRecordData": bplist.dumps(pair_record), + "DeviceID": self.devid, + }) + return pair_record def handshake(self): """ @@ -191,8 +234,7 @@ def handshake(self): self._pair_record = self._read_pair_record() except MuxReplyError as err: if err.reply_code == UsbmuxReplyCode.BadDevice: - self.pair() - self._pair_record = self._read_pair_record() + self._pair_record = self.pair() @property def ssl_pemfile_path(self): @@ -298,6 +340,8 @@ def create_session(self) -> PlistSocket: s.send_packet({ "Request": "StopSession", + "ProtocolVersion": '2', + "Label": PROGRAM_NAME, "SessionID": session_id, }) s.recv_packet() @@ -307,36 +351,39 @@ def device_info(self, domain: Optional[str] = None) -> dict: Args: domain: can be found in "ideviceinfo -h", eg: com.apple.disk_usage """ - with self.create_session() as conn: - packet = { - "Request": "GetValue", - "Label": PROGRAM_NAME, - } - if domain: - packet["Domain"] = domain - ret = conn.send_recv_packet(packet) - return ret['Value'] - - def get_value(self, key: str, no_session: bool = False): + return self.get_value(domain=domain) + # with self.create_session() as conn: + # packet = { + # "Request": "GetValue", + # "Label": PROGRAM_NAME, + # } + # if domain: + # packet["Domain"] = domain + # ret = conn.send_recv_packet(packet) + # return ret['Value'] + + def get_value(self, key: str = '', domain: str = "", no_session: bool = False): """ key can be: ProductVersion Args: + domain (str): com.apple.disk_usage no_session: set to True when not paired """ + request = { + "Request": "GetValue", + "Label": PROGRAM_NAME, + } + if key: + request['Key'] = key + if domain: + request['Domain'] = domain + if no_session: with self.create_inner_connection() as s: - ret = s.send_recv_packet({ - "Request": "GetValue", - "Key": key, - "Label": PROGRAM_NAME, - }) + ret = s.send_recv_packet(request) return ret['Value'] else: with self.create_session() as conn: - ret = conn.send_recv_packet({ - "Request": "GetValue", - "Key": key, - "Label": PROGRAM_NAME, - }) + ret = conn.send_recv_packet(request) return ret['Value'] def screen_info(self) -> tuple: diff --git a/tidevice/_ssl.py b/tidevice/_ssl.py new file mode 100644 index 0000000..9b8ffb0 --- /dev/null +++ b/tidevice/_ssl.py @@ -0,0 +1,90 @@ +# coding: utf-8 +# +# Referenced from +# - https://github.com/YueChen-C/py-ios-device/blob/10d207c329ae56e25267d5b7386cf551676443b4/ios_device/util/ssl.py +# - https://github.com/anonymous5l/iConsole/blob/dc65f76183feff0d9d897d8506fd70603838da81/tunnel/lockdown.go#L29 + +__all__ = ['make_certs_and_key'] + +import base64 +from datetime import datetime, timedelta + +from OpenSSL.crypto import FILETYPE_PEM as PEM +from OpenSSL.crypto import (TYPE_RSA, X509, PKey, X509Req, dump_certificate, + dump_privatekey, load_publickey) +from pyasn1.codec.der import decoder as der_decoder +from pyasn1.codec.der import encoder as der_encoder +from pyasn1.type import univ + + +def make_certs_and_key(device_public_key: bytes): + """ + 1. create private key + 2. create certificate + """ + device_key = load_publickey(PEM, convert_PKCS1_to_PKCS8_pubkey(device_public_key)) + device_key._only_public = False + + # root key + root_key = PKey() + root_key.generate_key(TYPE_RSA, 2048) + + host_req = make_req(root_key) + host_cert = make_cert(host_req, root_key) + + device_req = make_req(device_key, 'Device') + device_cert = make_cert(device_req, root_key) + + return dump_certificate(PEM, host_cert), dump_privatekey(PEM, root_key), dump_certificate(PEM, device_cert) + + +def convert_PKCS1_to_PKCS8_pubkey(data: bytes) -> bytes: + pubkey_pkcs1_b64 = b''.join(data.split(b'\n')[1:-2]) + pubkey_pkcs1, restOfInput = der_decoder.decode(base64.b64decode(pubkey_pkcs1_b64)) + bit_str = univ.Sequence() + bit_str.setComponentByPosition(0, univ.Integer(pubkey_pkcs1[0])) + bit_str.setComponentByPosition(1, univ.Integer(pubkey_pkcs1[1])) + bit_str = der_encoder.encode(bit_str) + try: + bit_str = ''.join([('00000000'+bin(ord(x))[2:])[-8:] for x in list(bit_str)]) + except Exception: + bit_str = ''.join([('00000000'+bin(x)[2:])[-8:] for x in list(bit_str)]) + bit_str = univ.BitString("'%s'B" % bit_str) + pubkeyid = univ.Sequence() + pubkeyid.setComponentByPosition(0, univ.ObjectIdentifier('1.2.840.113549.1.1.1')) # == OID for rsaEncryption + pubkeyid.setComponentByPosition(1, univ.Null('')) + pubkey_seq = univ.Sequence() + pubkey_seq.setComponentByPosition(0, pubkeyid) + pubkey_seq.setComponentByPosition(1, bit_str) + pubkey = der_encoder.encode(pubkey_seq) + return b'-----BEGIN PUBLIC KEY-----\n' + base64.encodebytes(pubkey) + b'-----END PUBLIC KEY-----\n' + + +def x509_time(**kwargs) -> bytes: + dt = datetime.utcnow() + timedelta(**kwargs) + return dt.strftime('%Y%m%d%H%M%SZ').encode('utf-8') + + +def make_cert(req: X509Req, ca_pkey: PKey) -> X509: + cert = X509() + cert.set_serial_number(1) + cert.set_version(2) + cert.set_subject(req.get_subject()) + cert.set_pubkey(req.get_pubkey()) + cert.set_notBefore(x509_time(minutes=-1)) + cert.set_notAfter(x509_time(days=30)) + # noinspection PyTypeChecker + cert.sign(ca_pkey, 'sha1') + return cert + + +def make_req(pub_key, cn=None, digest=None) -> X509Req: + req = X509Req() + req.set_version(2) + req.set_pubkey(pub_key) + if cn is not None: + subject = req.get_subject() + subject.CN = cn.encode('utf-8') + if digest: + req.sign(pub_key, digest) + return req diff --git a/tidevice/_usbmux.py b/tidevice/_usbmux.py index bc3546e..48d1ac9 100644 --- a/tidevice/_usbmux.py +++ b/tidevice/_usbmux.py @@ -80,7 +80,7 @@ def _check(self, data: dict): if 'Number' in data and data['Number'] != 0: raise MuxReplyError(data['Number']) - def read_system_BUID(self): + def read_system_BUID(self) -> str: """ BUID is always same """ data = self.send_recv({ 'ClientVersionString': 'libusbmuxd 1.1.0',