Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 89 additions & 15 deletions store/postgres/src/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,76 @@ pub fn features(conn: &PgConnection, site: &Site) -> Result<BTreeSet<SubgraphFea
.collect()
}

pub fn transact_block(
conn: &PgConnection,
site: &Site,
ptr: &BlockPtr,
firehose_cursor: Option<&str>,
full_count_query: &str,
count: i32,
) -> Result<(), StoreError> {
use crate::diesel::BoolExpressionMethods;
use subgraph_deployment as d;

// Work around a Diesel issue with serializing BigDecimals to numeric
let number = format!("{}::numeric", ptr.number);

let count_sql = if count == 0 {
// This amounts to a noop - the entity count does not change
"entity_count".to_string()
} else {
entity_count_sql(full_count_query, count)
};

// Treat a cursor of "" as null; not absolutely necessary for
// correctness since the firehose treats both as the same, but makes it
// a little clearer.
let firehose_cursor = if firehose_cursor == Some("") {
None
} else {
firehose_cursor
};
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This duplication with fn update_entity_count is unfortunate, would it be possible to not duplicate this code?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I factored that scary comment and string generation into a separate function.


let row_count = update(
d::table.filter(d::id.eq(site.id)).filter(
// Asserts that the processing direction is forward.
d::latest_ethereum_block_number
.lt(sql(&number))
.or(d::latest_ethereum_block_number.is_null()),
),
)
.set((
d::latest_ethereum_block_number.eq(sql(&number)),
d::latest_ethereum_block_hash.eq(ptr.hash_slice()),
d::firehose_cursor.eq(firehose_cursor),
d::entity_count.eq(sql(&count_sql)),
d::current_reorg_depth.eq(0),
))
.execute(conn)
.map_err(StoreError::from)?;

match row_count {
// Common case: A single row was updated.
1 => Ok(()),

// No matching rows were found. This is an error. By the filter conditions, this can only be
// due to a missing deployment (which `block_ptr` catches) or duplicate block processing.
0 => match block_ptr(&conn, &site.deployment)? {
Some(block_ptr_from) if block_ptr_from.number >= ptr.number => Err(
StoreError::DuplicateBlockProcessing(site.deployment.clone(), ptr.number),
),
None | Some(_) => Err(StoreError::Unknown(anyhow!(
"unknown error forwarding block ptr"
))),
},

// More than one matching row was found.
_ => Err(StoreError::ConstraintViolation(
"duplicate deployments in shard".to_owned(),
)),
}
}

pub fn forward_block_ptr(
conn: &PgConnection,
id: &DeploymentHash,
Expand Down Expand Up @@ -886,18 +956,7 @@ pub fn create_deployment(
Ok(())
}

pub fn update_entity_count(
conn: &PgConnection,
site: &Site,
full_count_query: &str,
count: i32,
) -> Result<(), StoreError> {
use subgraph_deployment as d;

if count == 0 {
return Ok(());
}

fn entity_count_sql(full_count_query: &str, count: i32) -> String {
// The big complication in this query is how to determine what the
// new entityCount should be. We want to make sure that if the entityCount
// is NULL or the special value `-1`, it gets recomputed. Using `-1` here
Expand All @@ -912,14 +971,29 @@ pub fn update_entity_count(
// is `NULL` or `-1`, forcing `coalesce` to evaluate its second
// argument, the query to count entities. In all other cases,
// `coalesce` does not evaluate its second argument
let count_update = format!(
format!(
"coalesce((nullif(entity_count, -1)) + ({count}),
({full_count_query}))",
full_count_query = full_count_query,
count = count
);
)
}

pub fn update_entity_count(
conn: &PgConnection,
site: &Site,
full_count_query: &str,
count: i32,
) -> Result<(), StoreError> {
use subgraph_deployment as d;

if count == 0 {
return Ok(());
}

let count_sql = entity_count_sql(full_count_query, count);
update(d::table.filter(d::id.eq(site.id)))
.set(d::entity_count.eq(sql(&count_update)))
.set(d::entity_count.eq(sql(&count_sql)))
.execute(conn)?;
Ok(())
}
Expand Down
21 changes: 8 additions & 13 deletions store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1008,12 +1008,6 @@ impl DeploymentStore {
&block_ptr_to,
stopwatch,
)?;
deployment::update_entity_count(
&conn,
site.as_ref(),
layout.count_query.as_str(),
count,
)?;
section.end();

dynds::insert(&conn, &site.deployment, data_sources, &block_ptr_to)?;
Expand All @@ -1027,13 +1021,14 @@ impl DeploymentStore {
)?;
}

deployment::forward_block_ptr(&conn, &site.deployment, block_ptr_to)?;

if let Some(cursor) = firehose_cursor {
if cursor != "" {
deployment::update_firehose_cursor(&conn, &site.deployment, cursor)?;
}
}
deployment::transact_block(
&conn,
&site,
block_ptr_to,
firehose_cursor,
layout.count_query.as_str(),
count,
)?;

Ok(event)
})?;
Expand Down