-
Notifications
You must be signed in to change notification settings - Fork 617
/
testPyBtcWalletRecovery.py
157 lines (129 loc) · 6.55 KB
/
testPyBtcWalletRecovery.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import sys
sys.path.append('..')
from pytest.Tiab import TiabTest
from CppBlockUtils import SecureBinaryData, CryptoECDSA, CryptoAES
from armoryengine.PyBtcAddress import PyBtcAddress
from armoryengine.ArmoryUtils import *
from armoryengine.BinaryUnpacker import BinaryUnpacker
from armoryengine.PyBtcWallet import PyBtcWallet
from armoryengine.PyBtcWalletRecovery import PyBtcWalletRecovery, RECOVERMODE
import unittest
import os
from armoryengine.ArmoryUtils import SECP256K1_ORDER, binary_to_int, BIGENDIAN
class PyBtcWalletRecoveryTest(TiabTest):
def setUp(self):
self.corruptWallet = 'corrupt_wallet.wallet'
self.wltID = self.buildCorruptWallet(self.corruptWallet)
def tearDown(self):
os.unlink(self.corruptWallet)
os.unlink(self.corruptWallet[:-7] + '_backup.wallet')
os.unlink('armory_%s_RECOVERED.wallet' % self.wltID)
os.unlink('armory_%s_RECOVERED_backup.wallet' % self.wltID)
def buildCorruptWallet(self, walletPath):
crpWlt = PyBtcWallet()
crpWlt.createNewWallet(walletPath, securePassphrase='testing', doRegisterWithBDM=False)
#not registering with the BDM, have to fill the wallet address pool manually
crpWlt.fillAddressPool(100)
#grab the last computed address
lastaddr = crpWlt.addrMap[crpWlt.lastComputedChainAddr160]
#corrupt the pubkey
PubKey = hex_to_binary('0478d430274f8c5ec1321338151e9f27f4c676a008bdf8638d07c0b6be9ab35c71a1518063243acd4dfe96b66e3f2ec8013c8e072cd09b3834a19f81f659cc3455')
lastaddr.binPublicKey65 = SecureBinaryData(PubKey)
crpWlt.addrMap[crpWlt.lastComputedChainAddr160] = lastaddr
crpWlt.fillAddressPool(200)
#insert a gap and inconsistent encryption
newAddr = PyBtcAddress()
newAddr.chaincode = lastaddr.chaincode
newAddr.chainIndex = 250
PrivKey = hex_to_binary('e3b0c44298fc1c149afbf4c8996fb92427ae41e5978fe51ca495991b7852b855')
newAddr.binPrivKey32_Plain = SecureBinaryData(PrivKey)
newAddr.binPublicKey65 = CryptoECDSA().ComputePublicKey( \
newAddr.binPrivKey32_Plain)
newAddr.addrStr20 = newAddr.binPublicKey65.getHash160()
newAddr.isInitialized = True
crpWlt.addrMap[newAddr.addrStr20] = newAddr
crpWlt.lastComputedChainAddr160 = newAddr.addrStr20
crpWlt.fillAddressPool(250)
lastAddr = crpWlt.addrMap[crpWlt.lastComputedChainAddr160]
PrivKey = hex_to_binary('e3b0c44298fc1c149afbf4c8996fb92427ae41e5978fe51ca495991b00000000')
lastAddr.binPrivKey32_Plain = SecureBinaryData(PrivKey)
lastAddr.binPublicKey65 = CryptoECDSA().ComputePublicKey( \
lastAddr.binPrivKey32_Plain)
lastAddr.keyChanged = True
crpWlt.kdfKey = crpWlt.kdf.DeriveKey(SecureBinaryData('testing'))
lastAddr.lock(secureKdfOutput=crpWlt.kdfKey)
lastAddr.useEncryption = True
crpWlt.fillAddressPool(350);
#TODO: corrupt a private key
#break an address entry at binary level
return crpWlt.uniqueIDB58
def testWalletRecovery(self):
#run recovery on broken wallet
recThread = PyBtcWalletRecovery().RecoverWallet(self.corruptWallet, \
'testing', RECOVERMODE.Full, \
returnError = 'Dict')
recThread.join()
brkWltResult = recThread.output
self.assertTrue(len(brkWltResult['sequenceGaps'])==1, \
"Sequence Gap Undetected")
self.assertTrue(len(brkWltResult['forkedPublicKeyChain'])==3, \
"Address Chain Forks Undetected")
self.assertTrue(len(brkWltResult['unmatchedPair'])==100, \
"Unmatched Priv/Pub Key Undetected")
self.assertTrue(len(brkWltResult['misc'])==50, \
"Wallet Encryption Inconsistency Undetected")
self.assertTrue(len(brkWltResult['importedErr'])==50, \
"Unexpected Errors Found")
self.assertTrue(brkWltResult['nErrors']==204, \
"Unexpected Errors Found")
#check obfuscated keys yield the valid key
#grab root key
badWlt = PyBtcWallet()
badWlt.readWalletFile(self.corruptWallet, False)
rootAddr = badWlt.addrMap['ROOT']
SecurePassphrase = SecureBinaryData('testing')
secureKdfOutput = badWlt.kdf.DeriveKey(SecurePassphrase)
#HMAC Q
rootAddr.unlock(secureKdfOutput)
Q = rootAddr.binPrivKey32_Plain.toBinStr()
nonce = 0
while 1:
hmacQ = HMAC256(Q, 'LogMult%d' % nonce)
if binary_to_int(hmacQ, BIGENDIAN) < SECP256K1_ORDER:
hmacQ = SecureBinaryData(hmacQ)
break
nonce = nonce +1
#Bad Private Keys
import operator
badKeys = [addrObj for addrObj in (sorted(badWlt.addrMap.values(),
key=operator.attrgetter('chainIndex')))]
#run through obdsPrivKey
for i in range(0, len(brkWltResult['privMult'])):
obfsPKey = SecureBinaryData(
hex_to_binary(brkWltResult['privMult'][i]))
pKey = CryptoECDSA().ECMultiplyScalars(obfsPKey.toBinStr(), \
hmacQ.toBinStr())
try:
badKeys[i+201].unlock(secureKdfOutput)
except:
continue
self.assertTrue(binary_to_hex(pKey) == \
badKeys[i+201].binPrivKey32_Plain.toHexStr(), \
'key mult error')
#run recovery on recovered wallet
recThread = PyBtcWalletRecovery().RecoverWallet( \
'armory_%s_RECOVERED.wallet' % self.wltID, \
'testing', RECOVERMODE.Full, \
returnError = 'Dict')
recThread.join()
rcvWltResult = recThread.output
self.assertTrue(len(rcvWltResult['importedErr'])==50, \
"Unexpected Errors Found")
self.assertTrue(rcvWltResult['nErrors']==50, \
"Unexpected Errors Found")
self.assertTrue(len(rcvWltResult['negativeImports'])==99, \
"Missing neg Imports")
# Running tests with "python <module name>" will NOT work for any Armory tests
# You must run tests with "python -m unittest <module name>" or run all tests with "python -m unittest discover"
# if __name__ == "__main__":
# unittest.main()