Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle re-orgs #81

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions src/cli/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
#[derive(Clone, Eq, PartialEq, Debug)]
#[command(author, version, about)]
pub struct Args<C: Clone + Eq + Debug + Subcommand, O: DescriptorOpts = DescrStdOpts> {
/// Set verbosity level.
/// Set verbosity level
///
/// Can be used multiple times to increase verbosity.
#[clap(short, long, global = true, action = clap::ArgAction::Count)]
Expand All @@ -53,10 +53,14 @@
#[command(flatten)]
pub resolver: ResolverOpt,

/// Force-sync wallet data with the indexer before performing the operation.
/// Force-sync wallet data with the indexer before performing the operation
#[clap(long, global = true)]
pub sync: bool,

/// Prune old transactions from the cache during the sync operation
#[clap(long, global = true)]
pub prune: bool,

Check warning on line 62 in src/cli/args.rs

View check run for this annotation

Codecov / codecov/patch

src/cli/args.rs#L62

Added line #L62 was not covered by tests

#[command(flatten)]
pub general: GeneralOpts,

Expand All @@ -72,6 +76,7 @@
wallet: self.wallet.clone(),
resolver: self.resolver.clone(),
sync: self.sync,
prune: self.prune,

Check warning on line 79 in src/cli/args.rs

View check run for this annotation

Codecov / codecov/patch

src/cli/args.rs#L79

Added line #L79 was not covered by tests
general: self.general.clone(),
command: cmd.clone(),
}
Expand Down Expand Up @@ -154,7 +159,7 @@
if sync {
let indexer = self.indexer()?;
eprint!("Syncing");
if let Some(errors) = wallet.update(&indexer).into_err() {
if let Some(errors) = wallet.update(&indexer, self.prune).into_err() {

Check warning on line 162 in src/cli/args.rs

View check run for this annotation

Codecov / codecov/patch

src/cli/args.rs#L162

Added line #L162 was not covered by tests
eprintln!(" partial, some requests has failed:");
for err in errors {
eprintln!("- {err}");
Expand Down
4 changes: 2 additions & 2 deletions src/cli/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@
if *publish {
let indexer = self.indexer()?;
eprint!("Publishing transaction via {} ... ", indexer.name());
indexer.publish(&tx)?;
indexer.broadcast(&tx)?;

Check warning on line 326 in src/cli/command.rs

View check run for this annotation

Codecov / codecov/patch

src/cli/command.rs#L326

Added line #L326 was not covered by tests
eprintln!("success");
}
}
Expand All @@ -343,7 +343,7 @@
if *publish {
let indexer = self.indexer()?;
eprint!("Publishing transaction via {} ... ", indexer.name());
indexer.publish(&tx)?;
indexer.broadcast(&tx)?;

Check warning on line 346 in src/cli/command.rs

View check run for this annotation

Codecov / codecov/patch

src/cli/command.rs#L346

Added line #L346 was not covered by tests
eprintln!("success");
}
}
Expand Down
47 changes: 8 additions & 39 deletions src/indexers/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,63 +72,32 @@
impl Indexer for AnyIndexer {
type Error = AnyIndexerError;

fn create<K, D: Descriptor<K>, L2: Layer2>(
&self,
descr: &WalletDescr<K, D, L2::Descr>,
) -> MayError<WalletCache<L2::Cache>, Vec<Self::Error>> {
match self {
#[cfg(feature = "electrum")]
AnyIndexer::Electrum(inner) => {
let result = inner.create::<K, D, L2>(descr);
MayError {
ok: result.ok,
err: result.err.map(|v| v.into_iter().map(|e| e.into()).collect()),
}
}
#[cfg(feature = "esplora")]
AnyIndexer::Esplora(inner) => {
let result = inner.create::<K, D, L2>(descr);
MayError {
ok: result.ok,
err: result.err.map(|v| v.into_iter().map(|e| e.into()).collect()),
}
}
#[cfg(feature = "mempool")]
AnyIndexer::Mempool(inner) => {
let result = inner.create::<K, D, L2>(descr);
MayError {
ok: result.ok,
err: result.err.map(|v| v.into_iter().map(|e| e.into()).collect()),
}
}
}
}

fn update<K, D: Descriptor<K>, L2: Layer2>(
&self,
descr: &WalletDescr<K, D, L2::Descr>,
cache: &mut WalletCache<L2::Cache>,
prune: bool,

Check warning on line 79 in src/indexers/any.rs

View check run for this annotation

Codecov / codecov/patch

src/indexers/any.rs#L79

Added line #L79 was not covered by tests
) -> MayError<usize, Vec<Self::Error>> {
match self {
#[cfg(feature = "electrum")]
AnyIndexer::Electrum(inner) => {
let result = inner.update::<K, D, L2>(descr, cache);
let result = inner.update::<K, D, L2>(descr, cache, prune);

Check warning on line 84 in src/indexers/any.rs

View check run for this annotation

Codecov / codecov/patch

src/indexers/any.rs#L84

Added line #L84 was not covered by tests
MayError {
ok: result.ok,
err: result.err.map(|v| v.into_iter().map(|e| e.into()).collect()),
}
}
#[cfg(feature = "esplora")]
AnyIndexer::Esplora(inner) => {
let result = inner.update::<K, D, L2>(descr, cache);
let result = inner.update::<K, D, L2>(descr, cache, prune);

Check warning on line 92 in src/indexers/any.rs

View check run for this annotation

Codecov / codecov/patch

src/indexers/any.rs#L92

Added line #L92 was not covered by tests
MayError {
ok: result.ok,
err: result.err.map(|v| v.into_iter().map(|e| e.into()).collect()),
}
}
#[cfg(feature = "mempool")]
AnyIndexer::Mempool(inner) => {
let result = inner.update::<K, D, L2>(descr, cache);
let result = inner.update::<K, D, L2>(descr, cache, prune);

Check warning on line 100 in src/indexers/any.rs

View check run for this annotation

Codecov / codecov/patch

src/indexers/any.rs#L100

Added line #L100 was not covered by tests
MayError {
ok: result.ok,
err: result.err.map(|v| v.into_iter().map(|e| e.into()).collect()),
Expand All @@ -137,14 +106,14 @@
}
}

fn publish(&self, tx: &Tx) -> Result<(), Self::Error> {
fn broadcast(&self, tx: &Tx) -> Result<(), Self::Error> {

Check warning on line 109 in src/indexers/any.rs

View check run for this annotation

Codecov / codecov/patch

src/indexers/any.rs#L109

Added line #L109 was not covered by tests
match self {
#[cfg(feature = "electrum")]
AnyIndexer::Electrum(inner) => inner.publish(tx).map_err(|e| e.into()),
AnyIndexer::Electrum(inner) => inner.broadcast(tx).map_err(|e| e.into()),

Check warning on line 112 in src/indexers/any.rs

View check run for this annotation

Codecov / codecov/patch

src/indexers/any.rs#L112

Added line #L112 was not covered by tests
#[cfg(feature = "esplora")]
AnyIndexer::Esplora(inner) => inner.publish(tx).map_err(|e| e.into()),
AnyIndexer::Esplora(inner) => inner.broadcast(tx).map_err(|e| e.into()),

Check warning on line 114 in src/indexers/any.rs

View check run for this annotation

Codecov / codecov/patch

src/indexers/any.rs#L114

Added line #L114 was not covered by tests
#[cfg(feature = "mempool")]
AnyIndexer::Mempool(inner) => inner.publish(tx).map_err(|e| e.into()),
AnyIndexer::Mempool(inner) => inner.broadcast(tx).map_err(|e| e.into()),

Check warning on line 116 in src/indexers/any.rs

View check run for this annotation

Codecov / codecov/patch

src/indexers/any.rs#L116

Added line #L116 was not covered by tests
}
}
}
29 changes: 20 additions & 9 deletions src/indexers/electrum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
// limitations under the License.

use std::collections::BTreeMap;
use std::mem;
use std::num::NonZeroU32;
use std::str::FromStr;

Expand Down Expand Up @@ -63,21 +64,21 @@
impl Indexer for Client {
type Error = ElectrumError;

fn create<K, D: Descriptor<K>, L2: Layer2>(
&self,
descriptor: &WalletDescr<K, D, L2::Descr>,
) -> MayError<WalletCache<L2::Cache>, Vec<Self::Error>> {
let mut cache = WalletCache::new_nonsync();
self.update::<K, D, L2>(descriptor, &mut cache).map(|_| cache)
}

fn update<K, D: Descriptor<K>, L2: Layer2>(
&self,
descriptor: &WalletDescr<K, D, L2::Descr>,
cache: &mut WalletCache<L2::Cache>,
prune: bool,

Check warning on line 71 in src/indexers/electrum.rs

View check run for this annotation

Codecov / codecov/patch

src/indexers/electrum.rs#L71

Added line #L71 was not covered by tests
) -> MayError<usize, Vec<Self::Error>> {
let mut errors = Vec::<ElectrumError>::new();

// First, we scan all addresses.
// Addresses may be re-used, so known transactions doesn't help here.
// We collect these transactions, which contain the most recent information, into a new
// cache. We remove old transaction, since its data are now updated (for instance, if a
// transaction was re-orged, it may have a different height).

let mut old_cache = mem::take(&mut cache.tx);

Check warning on line 81 in src/indexers/electrum.rs

View check run for this annotation

Codecov / codecov/patch

src/indexers/electrum.rs#L75-L81

Added lines #L75 - L81 were not covered by tests
let mut address_index = BTreeMap::new();
for keychain in descriptor.keychains() {
let mut empty_count = 0usize;
Expand All @@ -104,6 +105,7 @@

empty_count = 0;

// TODO: Separate as `WalletTx::from_electrum_history` method.

Check warning on line 108 in src/indexers/electrum.rs

View check run for this annotation

Codecov / codecov/patch

src/indexers/electrum.rs#L108

Added line #L108 was not covered by tests
let mut process_history_entry =
|hr: GetHistoryRes| -> Result<WalletTx, ElectrumError> {
let txid = hr.tx_hash;
Expand Down Expand Up @@ -202,6 +204,7 @@
for hr in hres {
match process_history_entry(hr) {
Ok(tx) => {
old_cache.remove(&tx.txid);

Check warning on line 207 in src/indexers/electrum.rs

View check run for this annotation

Codecov / codecov/patch

src/indexers/electrum.rs#L207

Added line #L207 was not covered by tests
cache.tx.insert(tx.txid, tx);
}
Err(e) => errors.push(e),
Expand Down Expand Up @@ -282,14 +285,22 @@
.insert(wallet_addr.expect_transmute());
}

// The remaining transactions are unmined ones.
if !prune {
for (txid, mut tx) in old_cache {
tx.status = TxStatus::Unknown;
cache.tx.insert(txid, tx);
}
}

Check warning on line 294 in src/indexers/electrum.rs

View check run for this annotation

Codecov / codecov/patch

src/indexers/electrum.rs#L289-L294

Added lines #L289 - L294 were not covered by tests

if errors.is_empty() {
MayError::ok(0)
} else {
MayError::err(0, errors)
}
}

fn publish(&self, tx: &Tx) -> Result<(), Self::Error> {
fn broadcast(&self, tx: &Tx) -> Result<(), Self::Error> {

Check warning on line 303 in src/indexers/electrum.rs

View check run for this annotation

Codecov / codecov/patch

src/indexers/electrum.rs#L303

Added line #L303 was not covered by tests
self.transaction_broadcast(tx)?;
Ok(())
}
Expand Down
32 changes: 22 additions & 10 deletions src/indexers/esplora.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
// limitations under the License.

use std::collections::BTreeMap;
use std::mem;
use std::num::NonZeroU32;
use std::ops::{Deref, DerefMut};

Expand Down Expand Up @@ -193,21 +194,21 @@
impl Indexer for Client {
type Error = Error;

fn create<K, D: Descriptor<K>, L2: Layer2>(
&self,
descriptor: &WalletDescr<K, D, L2::Descr>,
) -> MayError<WalletCache<L2::Cache>, Vec<Self::Error>> {
let mut cache = WalletCache::new_nonsync();
self.update::<K, D, L2>(descriptor, &mut cache).map(|_| cache)
}

fn update<K, D: Descriptor<K>, L2: Layer2>(
&self,
descriptor: &WalletDescr<K, D, L2::Descr>,
cache: &mut WalletCache<L2::Cache>,
prune: bool,

Check warning on line 201 in src/indexers/esplora.rs

View check run for this annotation

Codecov / codecov/patch

src/indexers/esplora.rs#L201

Added line #L201 was not covered by tests
) -> MayError<usize, Vec<Self::Error>> {
let mut errors = vec![];

// First, we scan all addresses.
// Addresses may be re-used, so known transactions doesn't help here.
// We collect these transactions, which contain the most recent information, into a new
// cache. We remove old transaction, since its data are now updated (for instance, if a
// transaction was re-orged, it may have a different height).

let mut old_cache = mem::take(&mut cache.tx);

Check warning on line 211 in src/indexers/esplora.rs

View check run for this annotation

Codecov / codecov/patch

src/indexers/esplora.rs#L205-L211

Added lines #L205 - L211 were not covered by tests
let mut address_index = BTreeMap::new();
for keychain in descriptor.keychains() {
let mut empty_count = 0usize;
Expand All @@ -232,7 +233,10 @@
}
Ok(txes) => {
empty_count = 0;
txids = txes.iter().map(|tx| tx.txid).collect();
txids.extend(txes.iter().map(|tx| tx.txid));
for txid in &txids {
old_cache.remove(txid);
}

Check warning on line 239 in src/indexers/esplora.rs

View check run for this annotation

Codecov / codecov/patch

src/indexers/esplora.rs#L236-L239

Added lines #L236 - L239 were not covered by tests
cache
.tx
.extend(txes.into_iter().map(WalletTx::from).map(|tx| (tx.txid, tx)));
Expand Down Expand Up @@ -313,12 +317,20 @@
.insert(wallet_addr.expect_transmute());
}

// The remaining transactions are unmined ones.
if !prune {
for (txid, mut tx) in old_cache {
tx.status = TxStatus::Unknown;
cache.tx.insert(txid, tx);
}
}

Check warning on line 326 in src/indexers/esplora.rs

View check run for this annotation

Codecov / codecov/patch

src/indexers/esplora.rs#L321-L326

Added lines #L321 - L326 were not covered by tests

if errors.is_empty() {
MayError::ok(0)
} else {
MayError::err(0, errors)
}
}

fn publish(&self, tx: &Tx) -> Result<(), Self::Error> { self.inner.broadcast(tx) }
fn broadcast(&self, tx: &Tx) -> Result<(), Self::Error> { self.inner.broadcast(tx) }

Check warning on line 335 in src/indexers/esplora.rs

View check run for this annotation

Codecov / codecov/patch

src/indexers/esplora.rs#L335

Added line #L335 was not covered by tests
}
12 changes: 10 additions & 2 deletions src/indexers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,21 @@
fn create<K, D: Descriptor<K>, L2: Layer2>(
&self,
descr: &WalletDescr<K, D, L2::Descr>,
) -> MayError<WalletCache<L2::Cache>, Vec<Self::Error>>;
) -> MayError<WalletCache<L2::Cache>, Vec<Self::Error>> {
let mut cache = WalletCache::new_nonsync();
self.update::<K, D, L2>(descr, &mut cache, true).map(|_| cache)
}

Check warning on line 51 in src/indexers/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/indexers/mod.rs#L48-L51

Added lines #L48 - L51 were not covered by tests

/// Update the wallet transaction cache and balances
///
/// If `prune` argument is set, removes all transactions which are not present in blockchain or
/// mempool (not known to the indexer, i.e. has `TxStatus::Unknown`).
fn update<K, D: Descriptor<K>, L2: Layer2>(
&self,
descr: &WalletDescr<K, D, L2::Descr>,
cache: &mut WalletCache<L2::Cache>,
prune: bool,
) -> MayError<usize, Vec<Self::Error>>;

fn publish(&self, tx: &Tx) -> Result<(), Self::Error>;
fn broadcast(&self, tx: &Tx) -> Result<(), Self::Error>;
}
15 changes: 11 additions & 4 deletions src/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

use crate::{
BlockInfo, CoinRow, Indexer, Layer2, Layer2Cache, Layer2Data, Layer2Descriptor, Layer2Empty,
MayError, MiningInfo, NoLayer2, Party, TxRow, WalletAddr, WalletTx, WalletUtxo,
MayError, MiningInfo, NoLayer2, Party, TxRow, TxStatus, WalletAddr, WalletTx, WalletUtxo,
};

#[derive(Copy, Clone, Eq, PartialEq, Debug, Display, Error)]
Expand Down Expand Up @@ -331,12 +331,16 @@
&mut self,
descriptor: &WalletDescr<K, D, L2::Descr>,
indexer: &I,
prune: bool,

Check warning on line 334 in src/wallet.rs

View check run for this annotation

Codecov / codecov/patch

src/wallet.rs#L334

Added line #L334 was not covered by tests
) -> MayError<usize, Vec<I::Error>> {
let res = indexer.update::<K, D, L2>(descriptor, self);
let res = indexer.update::<K, D, L2>(descriptor, self, prune);

Check warning on line 336 in src/wallet.rs

View check run for this annotation

Codecov / codecov/patch

src/wallet.rs#L336

Added line #L336 was not covered by tests
self.mark_dirty();
res
}

/// Prunes transaction cache by removing all transactions with `TxStatus::Unknown`
pub fn prune(&mut self) { self.tx.retain(|_, tx| tx.status != TxStatus::Unknown) }

Check warning on line 342 in src/wallet.rs

View check run for this annotation

Codecov / codecov/patch

src/wallet.rs#L342

Added line #L342 was not covered by tests

pub fn addresses_on(&self, keychain: Keychain) -> &BTreeSet<WalletAddr> {
self.addr.get(&keychain).unwrap_or_else(|| {
panic!("keychain #{keychain} is not supported by the wallet descriptor")
Expand Down Expand Up @@ -543,10 +547,13 @@
res
}

pub fn update<I: Indexer>(&mut self, indexer: &I) -> MayError<(), Vec<I::Error>> {
self.cache.update::<I, K, D, L2>(&self.descr, indexer).map(|_| ())
pub fn update<I: Indexer>(&mut self, indexer: &I, prune: bool) -> MayError<(), Vec<I::Error>> {
self.cache.update::<I, K, D, L2>(&self.descr, indexer, prune).map(|_| ())

Check warning on line 551 in src/wallet.rs

View check run for this annotation

Codecov / codecov/patch

src/wallet.rs#L550-L551

Added lines #L550 - L551 were not covered by tests
}

/// Prunes transaction cache by removing all transactions with `TxStatus::Unknown`
pub fn prune(&mut self) { self.cache.prune() }

Check warning on line 555 in src/wallet.rs

View check run for this annotation

Codecov / codecov/patch

src/wallet.rs#L555

Added line #L555 was not covered by tests

pub fn to_deriver(&self) -> D
where
D: Clone,
Expand Down
Loading