diff --git a/store/postgres/src/deployment.rs b/store/postgres/src/deployment.rs index 874b6e36d2b..89e1674804a 100644 --- a/store/postgres/src/deployment.rs +++ b/store/postgres/src/deployment.rs @@ -251,6 +251,76 @@ pub fn features(conn: &PgConnection, site: &Site) -> Result, + full_count_query: &str, + count: i32, +) -> Result<(), StoreError> { + use crate::diesel::BoolExpressionMethods; + use subgraph_deployment as d; + + // Work around a Diesel issue with serializing BigDecimals to numeric + let number = format!("{}::numeric", ptr.number); + + let count_sql = if count == 0 { + // This amounts to a noop - the entity count does not change + "entity_count".to_string() + } else { + entity_count_sql(full_count_query, count) + }; + + // Treat a cursor of "" as null; not absolutely necessary for + // correctness since the firehose treats both as the same, but makes it + // a little clearer. + let firehose_cursor = if firehose_cursor == Some("") { + None + } else { + firehose_cursor + }; + + let row_count = update( + d::table.filter(d::id.eq(site.id)).filter( + // Asserts that the processing direction is forward. + d::latest_ethereum_block_number + .lt(sql(&number)) + .or(d::latest_ethereum_block_number.is_null()), + ), + ) + .set(( + d::latest_ethereum_block_number.eq(sql(&number)), + d::latest_ethereum_block_hash.eq(ptr.hash_slice()), + d::firehose_cursor.eq(firehose_cursor), + d::entity_count.eq(sql(&count_sql)), + d::current_reorg_depth.eq(0), + )) + .execute(conn) + .map_err(StoreError::from)?; + + match row_count { + // Common case: A single row was updated. + 1 => Ok(()), + + // No matching rows were found. This is an error. By the filter conditions, this can only be + // due to a missing deployment (which `block_ptr` catches) or duplicate block processing. + 0 => match block_ptr(&conn, &site.deployment)? { + Some(block_ptr_from) if block_ptr_from.number >= ptr.number => Err( + StoreError::DuplicateBlockProcessing(site.deployment.clone(), ptr.number), + ), + None | Some(_) => Err(StoreError::Unknown(anyhow!( + "unknown error forwarding block ptr" + ))), + }, + + // More than one matching row was found. + _ => Err(StoreError::ConstraintViolation( + "duplicate deployments in shard".to_owned(), + )), + } +} + pub fn forward_block_ptr( conn: &PgConnection, id: &DeploymentHash, @@ -886,18 +956,7 @@ pub fn create_deployment( Ok(()) } -pub fn update_entity_count( - conn: &PgConnection, - site: &Site, - full_count_query: &str, - count: i32, -) -> Result<(), StoreError> { - use subgraph_deployment as d; - - if count == 0 { - return Ok(()); - } - +fn entity_count_sql(full_count_query: &str, count: i32) -> String { // The big complication in this query is how to determine what the // new entityCount should be. We want to make sure that if the entityCount // is NULL or the special value `-1`, it gets recomputed. Using `-1` here @@ -912,14 +971,29 @@ pub fn update_entity_count( // is `NULL` or `-1`, forcing `coalesce` to evaluate its second // argument, the query to count entities. In all other cases, // `coalesce` does not evaluate its second argument - let count_update = format!( + format!( "coalesce((nullif(entity_count, -1)) + ({count}), ({full_count_query}))", full_count_query = full_count_query, count = count - ); + ) +} + +pub fn update_entity_count( + conn: &PgConnection, + site: &Site, + full_count_query: &str, + count: i32, +) -> Result<(), StoreError> { + use subgraph_deployment as d; + + if count == 0 { + return Ok(()); + } + + let count_sql = entity_count_sql(full_count_query, count); update(d::table.filter(d::id.eq(site.id))) - .set(d::entity_count.eq(sql(&count_update))) + .set(d::entity_count.eq(sql(&count_sql))) .execute(conn)?; Ok(()) } diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index d89632a049e..926a5de4636 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -1008,12 +1008,6 @@ impl DeploymentStore { &block_ptr_to, stopwatch, )?; - deployment::update_entity_count( - &conn, - site.as_ref(), - layout.count_query.as_str(), - count, - )?; section.end(); dynds::insert(&conn, &site.deployment, data_sources, &block_ptr_to)?; @@ -1027,13 +1021,14 @@ impl DeploymentStore { )?; } - deployment::forward_block_ptr(&conn, &site.deployment, block_ptr_to)?; - - if let Some(cursor) = firehose_cursor { - if cursor != "" { - deployment::update_firehose_cursor(&conn, &site.deployment, cursor)?; - } - } + deployment::transact_block( + &conn, + &site, + block_ptr_to, + firehose_cursor, + layout.count_query.as_str(), + count, + )?; Ok(event) })?;