Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 84 additions & 14 deletions node/src/manager/commands/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use graph_store_postgres::find_chain;
use graph_store_postgres::update_chain_name;
use graph_store_postgres::{ConnectionPool, command_support::catalog::block_store};

use crate::manager::prompt::prompt_for_confirmation;
use crate::network_setup::Networks;

pub async fn list(primary: ConnectionPool, store: BlockStore) -> Result<(), Error> {
Expand Down Expand Up @@ -250,50 +251,119 @@ pub async fn change_block_cache_shard(
.await?
.ok_or_else(|| anyhow!("unknown chain: {}", chain_name))?;
let old_shard = chain.shard;
let canonical_backup_name = format!("{chain_name}-old");
let existing_backup = find_chain(&mut conn, &canonical_backup_name).await?;

println!("Current shard: {}", old_shard);

let chain_store = store
.chain_store(&chain_name)
.await
.ok_or_else(|| anyhow!("unknown chain: {}", &chain_name))?;
let new_name = format!("{}-old", &chain_name);
let ident = chain_store.chain_identifier().await?;
Comment thread
dimitrovmaksim marked this conversation as resolved.
let target_shard = Shard::new(shard)?;

let reuse_existing_backup = match existing_backup.as_ref() {
None => false,
Some(backup) if backup.shard != target_shard => {
bail!(
"`{}` already exists on shard `{}`. Remove it with `graphman chain remove {}` before changing `{}` to shard `{}`",
canonical_backup_name,
backup.shard,
canonical_backup_name,
chain_name,
target_shard,
);
}
Some(backup) => {
let backup_ident = backup.network_identifier()?;
if backup_ident != ident {
bail!(
"`{}` has a different chain identifier ({}) than `{}` ({}). Remove it with `graphman chain remove {}` before changing `{}` to shard `{}`",
canonical_backup_name,
backup_ident,
chain_name,
ident,
canonical_backup_name,
chain_name,
target_shard,
);
}
let prompt = format!(
"`{}` already exists on shard `{}` and will be reused as the active `{}` chain.\nProceed?",
canonical_backup_name, target_shard, chain_name
);
if !prompt_for_confirmation(&prompt)? {
println!(
"Aborting. Remove `{}` with `graphman chain remove {}` if you want to create a fresh cache on shard `{}`.",
canonical_backup_name, canonical_backup_name, target_shard
);
return Ok(());
}
true
}
};

conn.transaction::<(), StoreError, _>(|conn| {
async {
let shard = Shard::new(shard.to_string())?;
let existing_backup_store = if reuse_existing_backup {
store.chain_store(&canonical_backup_name).await
} else {
None
};

let chain = BlockStore::allocate_chain(conn, &chain_name, &shard, &ident).await?;
if !reuse_existing_backup {
let chain =
BlockStore::allocate_chain(&mut conn, &chain_name, &target_shard, &ident).await?;
store.add_chain_store(&chain, true).await?;
}

store.add_chain_store(&chain, true).await?;
let temp_backup_name = format!("{chain_name}-old-temp");
if reuse_existing_backup && find_chain(&mut conn, &temp_backup_name).await?.is_some() {
bail!(
"`{}` already exists. Remove it with `graphman chain remove {}` before changing `{}` to shard `{}`",
temp_backup_name,
temp_backup_name,
chain_name,
target_shard,
);
}

// Drop the foreign key constraint on deployment_schemas
conn.transaction::<(), StoreError, _>(|conn| {
async {
sql_query(
"alter table deployment_schemas drop constraint deployment_schemas_network_fkey;",
)
.execute(conn).await?;

// Update the current chain name to chain-old
update_chain_name(conn, &chain_name, &new_name).await?;
if let Some(backup) = existing_backup.as_ref() {
update_chain_name(conn, &backup.name, &temp_backup_name).await?;
}

update_chain_name(conn, &chain_name, &canonical_backup_name).await?;

// Create a new chain with the name in the destination shard
let _ = add_chain(conn, &chain_name, &shard, ident).await?;
if reuse_existing_backup {
update_chain_name(conn, &temp_backup_name, &chain_name).await?;
} else {
add_chain(conn, &chain_name, &target_shard, ident.clone()).await?;
}

// Re-add the foreign key constraint
sql_query(
"alter table deployment_schemas add constraint deployment_schemas_network_fkey foreign key (network) references chains(name);",
)
.execute(conn).await?;

Ok(())
}.scope_boxed()
}).await?;

chain_store.update_name(&new_name).await?;
chain_store.update_name(&canonical_backup_name).await?;

if reuse_existing_backup && let Some(backup_store) = existing_backup_store.as_ref() {
backup_store.update_name(&chain_name).await?;
}

println!(
"Changed block cache shard for {} from {} to {}",
chain_name, old_shard, shard
chain_name, old_shard, target_shard
);

Ok(())
Expand Down
Loading