diff --git a/src/db/migrations/manager.rs b/src/db/migrations/manager.rs index 55166657..ac1502fa 100644 --- a/src/db/migrations/manager.rs +++ b/src/db/migrations/manager.rs @@ -266,7 +266,7 @@ impl MigrationManager { fn get_migration_template(name: &str) -> String { format!( - "use axum::async_trait; + "use async_trait::async_trait; use crate::db::migrations::manager::Migration; use crate::types::DynError; @@ -302,7 +302,6 @@ impl Migration for {name} {{ // Your cleanup logic here Ok(()) }} - }} ", name = name diff --git a/src/db/migrations/migrations_list/mod.rs b/src/db/migrations/migrations_list/mod.rs index 8b137891..5cbb12c4 100644 --- a/src/db/migrations/migrations_list/mod.rs +++ b/src/db/migrations/migrations_list/mod.rs @@ -1 +1 @@ - +pub mod tag_counts_reset_1739459180; diff --git a/src/db/migrations/migrations_list/tag_counts_reset_1739459180.rs b/src/db/migrations/migrations_list/tag_counts_reset_1739459180.rs new file mode 100644 index 00000000..2ead054b --- /dev/null +++ b/src/db/migrations/migrations_list/tag_counts_reset_1739459180.rs @@ -0,0 +1,190 @@ +use crate::db::migrations::manager::Migration; +use crate::models::post::PostCounts; +use crate::models::user::UserCounts; +use crate::types::DynError; +use crate::{get_redis_conn, RedisOps}; +use async_trait::async_trait; +use chrono::Utc; +use log::{debug, error, info}; + +const BATCH_SIZE: u32 = 500; + +pub struct TagCountsReset1739459180; + +#[async_trait] +impl Migration for TagCountsReset1739459180 { + fn id(&self) -> &'static str { + "TagCountsReset1739459180" + } + + fn is_multi_staged(&self) -> bool { + false + } + + async fn dual_write(_data: Box) -> Result<(), DynError> { + // We only need a backfill phase because there is only one source + Ok(()) + } + + async fn backfill(&self) -> Result<(), DynError> { + // Start deleting redis indexes + let pubky_list = dump_counts_from_index("User:Counts:*").await?; + let posts_list = dump_counts_from_index("Post:Counts:*").await?; + // Retrieve from graph and index the counts + compute_counts(pubky_list).await?; + compute_counts(posts_list).await?; + + Ok(()) + } + + async fn cutover(&self) -> Result<(), DynError> { + // Not necessary + Ok(()) + } + + async fn cleanup(&self) -> Result<(), DynError> { + // There is not extra cleanup because we did the necessary clean + // in the backfill phase + Ok(()) + } +} + +pub async fn compute_counts(list: Vec<(String, Option)>) -> Result<(), DynError> { + let list_size = list.len(); + let mut success_count = 0; + let mut error_count = 0; + info!( + "Starting compute_counts for {} items at {}", + list_size, + chrono::Utc::now() + ); + + for (pubky, post_id) in list.into_iter() { + if let Some(id) = post_id { + // Processing post counts + match PostCounts::get_from_graph(&pubky, &id).await { + Ok(Some((post_counts, _))) => { + if let Err(e) = post_counts.put_index_json(&[&pubky, &id], None, None).await { + error!("Failed to add Post:Counts:{pubky}:{id}, {e}"); + error_count += 1; + } else { + success_count += 1; + debug!("Successfully added Post:Counts:{pubky}:{id}"); + } + } + Ok(None) => { + error!("Not found from graph Post:Counts:{pubky}:{id}"); + error_count += 1; + } + Err(e) => { + error!("Error fetching from graph for Post:Counts:{pubky}:{id}, {e}"); + error_count += 1; + } + } + } else { + // Processing user counts + match UserCounts::get_from_graph(&pubky).await { + Ok(Some(user_counts)) => { + if let Err(e) = user_counts.put_index_json(&[&pubky], None, None).await { + error!("Failed to add User:Counts:{pubky}, {e}"); + error_count += 1; + } else { + success_count += 1; + debug!("Successfully added User:Counts:{pubky}"); + } + } + Ok(None) => { + error!("User:Counts not found from graph for User:Counts:{pubky}"); + error_count += 1; + } + Err(e) => { + error!("Error fetching from graph, User:Counts:{pubky}, {e}"); + error_count += 1; + } + } + } + } + + info!( + "compute_counts completed. Total: {}, success operations: {}, failed operations: {}, Timestamp: {}", + list_size, + success_count, + error_count, + chrono::Utc::now() + ); + + Ok(()) +} + +pub async fn dump_counts_from_index( + pattern: &str, +) -> Result)>, DynError> { + let mut cursor = 0; + let mut values = Vec::new(); + let mut total_keys_processed = 0; + let mut redis_connection = get_redis_conn().await?; + + info!( + "Starting Redis SCAN for pattern '{}', batch size: {}. Timestamp: {}", + pattern, + BATCH_SIZE, + Utc::now() + ); + + loop { + let start_time = Utc::now(); + // Scan for keys matching the pattern + let (new_cursor, keys): (u64, Vec) = redis::cmd("SCAN") + .arg(cursor) + .arg("MATCH") + .arg(pattern) + .arg("COUNT") + .arg(BATCH_SIZE) + .query_async(&mut redis_connection) + .await?; + + let batch_size = keys.len(); + total_keys_processed += batch_size; + + // If keys are found, delete them in a pipeline + if !keys.is_empty() { + let mut pipe = redis::pipe(); + for key in keys { + // Collect key information + values.push(remove_first_two_segments(&key)); + // Add the command to the pipeline + pipe.del(key); + } + let _: () = pipe.query_async(&mut redis_connection).await?; + + info!( + "Deleted {} keys in this batch. Batch completed in {:?} seconds.", + batch_size, + (Utc::now() - start_time).num_milliseconds() as f64 / 1000.0 + ); + } else { + info!( + "No keys found in this batch. Cursor: {}. Continuing...", + new_cursor + ); + } + + // Continue scanning until SCAN finishes + if new_cursor == 0 { + break; + } + cursor = new_cursor; + } + info!( + "=> Redis SCAN completed. Total keys processed: {}", + total_keys_processed + ); + Ok(values) +} + +pub fn remove_first_two_segments(key: &str) -> (String, Option) { + let mut parts = key.split(':').skip(2); + let pubky = parts.next().unwrap_or_default().to_string(); + let optional_post_id = parts.next().map(|s| s.to_string()); + (pubky, optional_post_id) +} diff --git a/src/db/migrations/mod.rs b/src/db/migrations/mod.rs index 4885e9e9..4ed28e8e 100644 --- a/src/db/migrations/mod.rs +++ b/src/db/migrations/mod.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use migrations_list::tag_counts_reset_1739459180::TagCountsReset1739459180; use neo4rs::Graph; use tokio::sync::Mutex; @@ -10,8 +11,9 @@ mod migrations_list; mod utils; pub fn get_migration_manager(graph: Arc>) -> MigrationManager { - // let migration_manager = MigrationManager::new(graph); + let mut migration_manager = MigrationManager::new(graph); // Add your migrations here to be picked up by the manager. Example: - // migration_manager.register(Box::new(MigrationX)); - MigrationManager::new(graph) + migration_manager.register(Box::new(TagCountsReset1739459180)); + migration_manager + //MigrationManager::new(graph) }