-
Notifications
You must be signed in to change notification settings - Fork 859
fix(auth): refresh OAuth2 tokens in long-running watch/subscribe loops #407
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
88e4c3d
d0f1fc9
499b450
7b693f1
f40d037
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| --- | ||
| "@googleworkspace/cli": patch | ||
| --- | ||
|
|
||
| Refresh OAuth access tokens for long-running Gmail watch and Workspace Events subscribe helpers before each Pub/Sub and Gmail request. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,9 @@ | ||
| use super::*; | ||
| use crate::auth::AccessTokenProvider; | ||
| use std::path::PathBuf; | ||
|
|
||
| const PUBSUB_API_BASE: &str = "https://pubsub.googleapis.com/v1"; | ||
|
|
||
| #[derive(Debug, Clone, Default, Builder)] | ||
| #[builder(setter(into))] | ||
| pub struct SubscribeConfig { | ||
|
|
@@ -110,6 +113,7 @@ pub(super) async fn handle_subscribe( | |
| } | ||
|
|
||
| let client = crate::client::build_client()?; | ||
| let pubsub_token_provider = auth::token_provider(&[PUBSUB_SCOPE]); | ||
|
|
||
| // Get Pub/Sub token | ||
| let pubsub_token = auth::get_token(&[PUBSUB_SCOPE]) | ||
|
|
@@ -248,29 +252,38 @@ pub(super) async fn handle_subscribe( | |
| }; | ||
|
|
||
| // Pull loop | ||
| let result = pull_loop(&client, &pubsub_token, &pubsub_subscription, config.clone()).await; | ||
| let result = pull_loop( | ||
| &client, | ||
| &pubsub_token_provider, | ||
| &pubsub_subscription, | ||
| config.clone(), | ||
| PUBSUB_API_BASE, | ||
| ) | ||
| .await; | ||
|
|
||
| // On exit, print reconnection info or cleanup | ||
| if created_resources { | ||
| if config.cleanup { | ||
| eprintln!("\nCleaning up Pub/Sub resources..."); | ||
| // Delete Pub/Sub subscription | ||
| let _ = client | ||
| .delete(format!( | ||
| "https://pubsub.googleapis.com/v1/{pubsub_subscription}" | ||
| )) | ||
| .bearer_auth(&pubsub_token) | ||
| .send() | ||
| .await; | ||
| // Delete Pub/Sub topic | ||
| if let Some(ref topic) = topic_name { | ||
| if let Ok(pubsub_token) = pubsub_token_provider.access_token().await { | ||
| let _ = client | ||
| .delete(format!("https://pubsub.googleapis.com/v1/{topic}")) | ||
| .delete(format!("{PUBSUB_API_BASE}/{pubsub_subscription}")) | ||
| .bearer_auth(&pubsub_token) | ||
| .send() | ||
| .await; | ||
| // Delete Pub/Sub topic | ||
| if let Some(ref topic) = topic_name { | ||
| let _ = client | ||
| .delete(format!("{PUBSUB_API_BASE}/{topic}")) | ||
| .bearer_auth(&pubsub_token) | ||
| .send() | ||
| .await; | ||
| } | ||
| eprintln!("Cleanup complete."); | ||
| } else { | ||
| eprintln!("Warning: failed to refresh token for cleanup. Resources may need manual deletion."); | ||
| } | ||
etanase marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| eprintln!("Cleanup complete."); | ||
| } else { | ||
| eprintln!("\n--- Reconnection Info ---"); | ||
| eprintln!( | ||
|
|
@@ -301,21 +314,24 @@ pub(super) async fn handle_subscribe( | |
| /// Pulls messages from a Pub/Sub subscription in a loop. | ||
| async fn pull_loop( | ||
| client: &reqwest::Client, | ||
| token: &str, | ||
| token_provider: &dyn auth::AccessTokenProvider, | ||
| subscription: &str, | ||
| config: SubscribeConfig, | ||
| pubsub_api_base: &str, | ||
| ) -> Result<(), GwsError> { | ||
| let mut file_counter: u64 = 0; | ||
| loop { | ||
| let token = token_provider | ||
| .access_token() | ||
| .await | ||
| .map_err(|e| GwsError::Auth(format!("Failed to get Pub/Sub token: {e}")))?; | ||
| let pull_body = json!({ | ||
| "maxMessages": config.max_messages, | ||
| }); | ||
|
|
||
| let pull_future = client | ||
| .post(format!( | ||
| "https://pubsub.googleapis.com/v1/{subscription}:pull" | ||
| )) | ||
| .bearer_auth(token) | ||
| .post(format!("{pubsub_api_base}/{subscription}:pull")) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The To remediate this, validate each event type in
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As above: This is pre-existing on main. Opened #408 to track. |
||
| .bearer_auth(&token) | ||
| .header("Content-Type", "application/json") | ||
| .json(&pull_body) | ||
| .timeout(std::time::Duration::from_secs(config.poll_interval.max(10))) | ||
|
|
@@ -379,10 +395,8 @@ async fn pull_loop( | |
| }); | ||
|
|
||
| let _ = client | ||
| .post(format!( | ||
| "https://pubsub.googleapis.com/v1/{subscription}:acknowledge" | ||
| )) | ||
| .bearer_auth(token) | ||
| .post(format!("{pubsub_api_base}/{subscription}:acknowledge")) | ||
| .bearer_auth(&token) | ||
| .header("Content-Type", "application/json") | ||
| .json(&ack_body) | ||
| .send() | ||
|
|
@@ -526,6 +540,76 @@ fn derive_slug_from_event_types(event_types: &[&str]) -> String { | |
| #[cfg(test)] | ||
| mod tests { | ||
| use super::*; | ||
| use crate::auth::FakeTokenProvider; | ||
| use base64::Engine as _; | ||
| use std::sync::Arc; | ||
| use tokio::io::{AsyncReadExt, AsyncWriteExt}; | ||
| use tokio::net::TcpListener; | ||
| use tokio::sync::Mutex; | ||
|
|
||
| async fn spawn_subscribe_server() -> ( | ||
| String, | ||
| Arc<Mutex<Vec<(String, String)>>>, | ||
| tokio::task::JoinHandle<()>, | ||
| ) { | ||
| let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); | ||
| let addr = listener.local_addr().unwrap(); | ||
| let requests = Arc::new(Mutex::new(Vec::new())); | ||
| let recorded_requests = Arc::clone(&requests); | ||
|
|
||
| let handle = tokio::spawn(async move { | ||
| for _ in 0..2 { | ||
| let (mut stream, _) = listener.accept().await.unwrap(); | ||
| let mut buf = [0_u8; 8192]; | ||
| let bytes_read = stream.read(&mut buf).await.unwrap(); | ||
| let request = String::from_utf8_lossy(&buf[..bytes_read]); | ||
| let path = request | ||
| .lines() | ||
| .next() | ||
| .and_then(|line| line.split_whitespace().nth(1)) | ||
| .unwrap_or("") | ||
| .to_string(); | ||
| let auth_header = request | ||
| .lines() | ||
| .find(|line| line.to_ascii_lowercase().starts_with("authorization:")) | ||
| .unwrap_or("") | ||
| .trim() | ||
| .to_string(); | ||
| recorded_requests | ||
| .lock() | ||
| .await | ||
| .push((path.clone(), auth_header)); | ||
|
|
||
| let body = match path.as_str() { | ||
| "/v1/projects/test/subscriptions/demo:pull" => json!({ | ||
| "receivedMessages": [{ | ||
| "ackId": "ack-1", | ||
| "message": { | ||
| "attributes": { | ||
| "type": "google.workspace.chat.message.v1.created", | ||
| "source": "//chat/spaces/A" | ||
| }, | ||
| "data": base64::engine::general_purpose::STANDARD | ||
| .encode(json!({ "id": "evt-1" }).to_string()) | ||
| } | ||
| }] | ||
| }) | ||
| .to_string(), | ||
| "/v1/projects/test/subscriptions/demo:acknowledge" => json!({}).to_string(), | ||
| other => panic!("unexpected request path: {other}"), | ||
| }; | ||
|
|
||
| let response = format!( | ||
| "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nConnection: close\r\nContent-Length: {}\r\n\r\n{}", | ||
| body.len(), | ||
| body | ||
| ); | ||
| stream.write_all(response.as_bytes()).await.unwrap(); | ||
| } | ||
| }); | ||
|
|
||
| (format!("http://{addr}/v1"), requests, handle) | ||
| } | ||
|
|
||
| fn make_matches_subscribe(args: &[&str]) -> ArgMatches { | ||
| let cmd = Command::new("test") | ||
|
|
@@ -753,4 +837,42 @@ mod tests { | |
| let err_msg = result.unwrap_err().to_string(); | ||
| assert!(err_msg.contains("--project is required")); | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_pull_loop_refreshes_pubsub_token_between_requests() { | ||
| let client = reqwest::Client::new(); | ||
| let token_provider = FakeTokenProvider::new(["pubsub-token"]); | ||
| let (pubsub_base, requests, server) = spawn_subscribe_server().await; | ||
| let config = SubscribeConfigBuilder::default() | ||
| .subscription(Some(SubscriptionName( | ||
| "projects/test/subscriptions/demo".to_string(), | ||
| ))) | ||
| .max_messages(1_u32) | ||
| .poll_interval(1_u64) | ||
| .once(true) | ||
| .build() | ||
| .unwrap(); | ||
|
|
||
| pull_loop( | ||
| &client, | ||
| &token_provider, | ||
| "projects/test/subscriptions/demo", | ||
| config, | ||
| &pubsub_base, | ||
| ) | ||
| .await | ||
| .unwrap(); | ||
|
|
||
| server.await.unwrap(); | ||
|
|
||
| let requests = requests.lock().await; | ||
| assert_eq!(requests.len(), 2); | ||
| assert_eq!(requests[0].0, "/v1/projects/test/subscriptions/demo:pull"); | ||
| assert_eq!(requests[0].1, "authorization: Bearer pubsub-token"); | ||
| assert_eq!( | ||
| requests[1].0, | ||
| "/v1/projects/test/subscriptions/demo:acknowledge" | ||
| ); | ||
| assert_eq!(requests[1].1, "authorization: Bearer pubsub-token"); | ||
| } | ||
|
Comment on lines
+842
to
+877
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The test name async fn test_pull_loop_refreshes_pubsub_token_between_requests() {
let client = reqwest::Client::new();
let token_provider = FakeTokenProvider::new(["token-for-pull", "token-for-ack"]);
let (pubsub_base, requests, server) = spawn_subscribe_server().await;
let config = SubscribeConfigBuilder::default()
.subscription(Some(SubscriptionName(
"projects/test/subscriptions/demo".to_string(),
)))
.max_messages(1_u32)
.poll_interval(1_u64)
.once(true)
.build()
.unwrap();
pull_loop(
&client,
&token_provider,
"projects/test/subscriptions/demo",
config,
&pubsub_base,
)
.await
.unwrap();
server.await.unwrap();
let requests = requests.lock().await;
assert_eq!(requests.len(), 2);
assert_eq!(requests[0].0, "/v1/projects/test/subscriptions/demo:pull");
assert_eq!(requests[0].1, "authorization: Bearer token-for-pull");
assert_eq!(
requests[1].0,
"/v1/projects/test/subscriptions/demo:acknowledge"
);
assert_eq!(requests[1].1, "authorization: Bearer token-for-ack");
}
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test confirms that the token is fetched per loop iteration. As per earlier review, it does not refresh the token within the same loop for the ack. |
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
topicname is constructed using aslugderived from user-suppliedevent-typeswithout validation. If an attacker provides an event type containing path traversal segments (e.g.,google.workspace.drive.file.v1.updated/../something), the resultingtopicname will contain these segments, leading to a URL path traversal vulnerability when calling the Pub/Sub API. Althoughprojectis validated, theslugpart of the resource name remains a vector for injection.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also pre-existing on main. Opened #408 to track.