@@ -10,10 +10,7 @@ use anyhow::anyhow;
10
10
use anyhow:: Context ;
11
11
use bytes:: Buf ;
12
12
use entangler:: { ChunkRange , Config , EntanglementResult , Entangler } ;
13
- use entangler:: { ChunkRange , Config , Entangler } ;
14
13
use entangler_storage:: iroh:: IrohStorage as EntanglerIrohStorage ;
15
- use entangler_storage:: iroh:: { BlobsWrapper , IrohStorage as EntanglerIrohStorage } ;
16
- use fendermint_actor_blobs_shared:: state:: Hash as BlobHash ;
17
14
use fendermint_actor_bucket:: { GetParams , Object } ;
18
15
use fendermint_app_settings:: objects:: ObjectsSettings ;
19
16
use fendermint_rpc:: { client:: FendermintClient , message:: GasParams , QueryClient } ;
@@ -26,7 +23,8 @@ use fvm_shared::{
26
23
use ipc_api:: ethers_address_to_fil_address;
27
24
28
25
use iroh:: NodeAddr ;
29
- use iroh_blobs:: { provider:: AddProgress , rpc:: client:: blobs:: BlobStatus , util:: SetTagOption , Hash } ;
26
+ use iroh_blobs:: hashseq:: HashSeq ;
27
+ use iroh_blobs:: { rpc:: client:: blobs:: BlobStatus , util:: SetTagOption , Hash } ;
30
28
use iroh_manager:: { get_blob_hash_and_size, IrohNode } ;
31
29
use lazy_static:: lazy_static;
32
30
use prometheus:: { register_histogram, register_int_counter, Histogram , IntCounter } ;
87
85
. and( with_iroh( iroh_node. clone( ) ) )
88
86
. and( warp:: multipart:: form( ) . max_length( settings. max_object_size + 1024 * 1024 ) ) // max_object_size + 1MB for form overhead
89
87
. and( with_max_size( settings. max_object_size) )
90
- . and_then( handle_object_upload_with_manager ) ;
88
+ . and_then( handle_object_upload ) ;
91
89
92
90
let objects_download = warp:: path!( "v1" / "objects" / String / ..)
93
91
. and( warp:: path:: tail( ) )
98
96
. and( warp:: query:: <HeightQuery >( ) )
99
97
. and( with_client( client. clone( ) ) )
100
98
. and( with_iroh( iroh_node. clone( ) ) )
101
- . and_then( handle_object_download_with_manager ) ;
99
+ . and_then( handle_object_download ) ;
102
100
103
101
let router = health
104
102
. or( node_addr)
@@ -286,19 +284,6 @@ struct UploadResponse {
286
284
metadata_hash : String ,
287
285
}
288
286
289
- async fn handle_object_upload_with_manager (
290
- mut iroh : IrohManager ,
291
- form_data : warp:: multipart:: FormData ,
292
- max_size : u64 ,
293
- ) -> Result < impl Reply , Rejection > {
294
- let iroh_client = iroh. client ( ) . await . map_err ( |e| {
295
- Rejection :: from ( BadRequest {
296
- message : format ! ( "failed to load iroh client: {}" , e) ,
297
- } )
298
- } ) ?;
299
- handle_object_upload ( iroh_client, form_data, max_size) . await
300
- }
301
-
302
287
async fn handle_object_upload (
303
288
iroh : IrohNode ,
304
289
form_data : warp:: multipart:: FormData ,
@@ -403,7 +388,7 @@ async fn handle_object_upload(
403
388
} ) ?;
404
389
405
390
let hash = * temp_tag. hash ( ) ;
406
- let new_tag = iroh :: blobs :: Tag ( format ! ( "temp-{hash}-{upload_id}" ) . into ( ) ) ;
391
+ let new_tag = iroh_blobs :: Tag ( format ! ( "temp-{hash}-{upload_id}" ) . into ( ) ) ;
407
392
batch. persist_to ( temp_tag, new_tag) . await . map_err ( |e| {
408
393
Rejection :: from ( BadRequest {
409
394
message : format ! ( "failed to persist blob: {}" , e) ,
@@ -412,7 +397,7 @@ async fn handle_object_upload(
412
397
413
398
drop ( batch) ;
414
399
415
- let status = iroh. blobs ( ) . status ( hash) . await . map_err ( |e| {
400
+ let status = iroh. blobs_client ( ) . status ( hash) . await . map_err ( |e| {
416
401
Rejection :: from ( BadRequest {
417
402
message : format ! ( "failed to check blob status: {}" , e) ,
418
403
} )
@@ -471,7 +456,7 @@ async fn handle_object_upload(
471
456
}
472
457
473
458
async fn tag_entangled_data (
474
- iroh : & iroh :: client :: Iroh ,
459
+ iroh : & IrohNode ,
475
460
ent_result : & EntanglementResult ,
476
461
upload_id : Uuid ,
477
462
) -> Result < Hash , anyhow:: Error > {
@@ -498,7 +483,7 @@ async fn tag_entangled_data(
498
483
. collect :: < Vec < _ > > ( )
499
484
. join ( ", " ) ;
500
485
501
- let batch = iroh. blobs ( ) . batch ( ) . await ?;
486
+ let batch = iroh. blobs_client ( ) . batch ( ) . await ?;
502
487
503
488
// make a hash sequence object from the hashes and upload it to iroh
504
489
let hash_seq = hashes. into_iter ( ) . collect :: < HashSeq > ( ) ;
@@ -512,7 +497,7 @@ async fn tag_entangled_data(
512
497
) ;
513
498
514
499
// this tag will be replaced later by the validator to "stored-seq-{hash_seq_hash}"
515
- let hash_seq_tag = iroh :: blobs :: Tag ( format ! ( "temp-seq-{hash_seq_hash}" ) . into ( ) ) ;
500
+ let hash_seq_tag = iroh_blobs :: Tag ( format ! ( "temp-seq-{hash_seq_hash}" ) . into ( ) ) ;
516
501
batch. persist_to ( temp_tag, hash_seq_tag) . await ?;
517
502
518
503
drop ( batch) ;
@@ -523,13 +508,13 @@ async fn tag_entangled_data(
523
508
. info
524
509
. get ( "tag" )
525
510
. ok_or_else ( || anyhow ! ( "Missing tag in entanglement upload result" ) ) ?;
526
- let tag = iroh :: blobs :: Tag :: from ( tag_value. clone ( ) ) ;
527
- iroh. tags ( ) . delete ( tag) . await ?;
511
+ let tag = iroh_blobs :: Tag :: from ( tag_value. clone ( ) ) ;
512
+ iroh. blobs_client ( ) . tags ( ) . delete ( tag) . await ?;
528
513
}
529
514
530
515
// remove upload tags
531
- let orig_tag = iroh :: blobs :: Tag ( format ! ( "temp-{orig_hash}-{upload_id}" ) . into ( ) ) ;
532
- iroh. tags ( ) . delete ( orig_tag) . await ?;
516
+ let orig_tag = iroh_blobs :: Tag ( format ! ( "temp-{orig_hash}-{upload_id}" ) . into ( ) ) ;
517
+ iroh. blobs_client ( ) . tags ( ) . delete ( orig_tag) . await ?;
533
518
534
519
Ok ( hash_seq_hash)
535
520
}
@@ -580,32 +565,6 @@ pub(crate) struct ObjectRange {
580
565
body : Body ,
581
566
}
582
567
583
- async fn handle_object_download_with_manager < F : QueryClient + Send + Sync > (
584
- address : String ,
585
- tail : Tail ,
586
- method : String ,
587
- range : Option < String > ,
588
- height_query : HeightQuery ,
589
- client : F ,
590
- mut iroh : IrohManager ,
591
- ) -> Result < impl Reply , Rejection > {
592
- let iroh_client = iroh. client ( ) . await . map_err ( |e| {
593
- Rejection :: from ( BadRequest {
594
- message : format ! ( "failed to load iroh client: {}" , e) ,
595
- } )
596
- } ) ?;
597
- handle_object_download (
598
- address,
599
- tail,
600
- method,
601
- range,
602
- height_query,
603
- client,
604
- iroh_client,
605
- )
606
- . await
607
- }
608
-
609
568
async fn handle_object_download < F : QueryClient + Send + Sync > (
610
569
address : String ,
611
570
tail : Tail ,
@@ -637,11 +596,13 @@ async fn handle_object_download<F: QueryClient + Send + Sync>(
637
596
match maybe_object {
638
597
Some ( object) => {
639
598
let hash = Hash :: from_bytes ( object. hash . 0 ) ;
640
- let ( hash, size) = get_blob_hash_and_size ( & iroh, hash) . await . map_err ( |e| {
641
- Rejection :: from ( BadRequest {
642
- message : e. to_string ( ) ,
643
- } )
644
- } ) ?;
599
+ let ( hash, size) = get_blob_hash_and_size ( iroh. blobs_client ( ) , hash)
600
+ . await
601
+ . map_err ( |e| {
602
+ Rejection :: from ( BadRequest {
603
+ message : e. to_string ( ) ,
604
+ } )
605
+ } ) ?;
645
606
646
607
let ent = new_entangler ( & iroh) . map_err ( |e| {
647
608
Rejection :: from ( BadRequest {
@@ -938,13 +899,13 @@ mod tests {
938
899
939
900
/// Prepares test data for object download tests by uploading data, creating entanglement,
940
901
/// and properly tagging the hash sequence
941
- async fn simulate_blob_upload (
942
- iroh : & iroh:: client:: Iroh ,
943
- data : impl Into < Bytes > ,
944
- ) -> ( Hash , Hash ) {
902
+ async fn simulate_blob_upload ( iroh : & IrohNode , data : impl Into < Bytes > ) -> ( Hash , Hash ) {
945
903
let data = data. into ( ) ; // Convert to Bytes first, which implements Send
946
- let ent = new_entangler ( iroh. clone ( ) ) . unwrap ( ) ;
947
- let ent_result = ent. upload ( data) . await . unwrap ( ) ;
904
+ let ent = new_entangler ( iroh) . unwrap ( ) ;
905
+ let data_stream = Box :: pin ( futures:: stream:: once ( async move {
906
+ Ok :: < Bytes , std:: io:: Error > ( data)
907
+ } ) ) ;
908
+ let ent_result = ent. upload ( data_stream) . await . unwrap ( ) ;
948
909
949
910
let metadata = ent
950
911
. download_metadata ( ent_result. metadata_hash . as_str ( ) )
@@ -964,13 +925,13 @@ mod tests {
964
925
)
965
926
. collect :: < HashSeq > ( ) ;
966
927
967
- let batch = iroh. blobs ( ) . batch ( ) . await . unwrap ( ) ;
928
+ let batch = iroh. blobs_client ( ) . batch ( ) . await . unwrap ( ) ;
968
929
let temp_tag = batch. add_bytes ( hash_seq) . await . unwrap ( ) ;
969
930
let hash_seq_hash = * temp_tag. hash ( ) ;
970
931
971
932
// Add a tag to the hash sequence as expected by the system
972
933
let tag_name = format ! ( "temp-seq-{hash_seq_hash}" ) ;
973
- let hash_seq_tag = iroh :: blobs :: Tag ( tag_name. into ( ) ) ;
934
+ let hash_seq_tag = iroh_blobs :: Tag ( tag_name. into ( ) ) ;
974
935
batch. persist_to ( temp_tag, hash_seq_tag) . await . unwrap ( ) ;
975
936
drop ( batch) ;
976
937
0 commit comments