Skip to content

Commit f2e18e2

Browse files
authored
feat: log when initializing a nodes table entry (#2104)
- have mysql ignore on conflicts, then propagate other errors - ensure the node is a url Closes STOR-500
1 parent a66dfa7 commit f2e18e2

File tree

12 files changed

+113
-76
lines changed

12 files changed

+113
-76
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

syncserver-settings/src/lib.rs

Lines changed: 75 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -70,71 +70,92 @@ impl Settings {
7070
// `SYNC_FOO__BAR_VALUE="gorp"` as `foo.bar_value = "gorp"`
7171
s.merge(Environment::with_prefix(&PREFIX.to_uppercase()).separator("__"))?;
7272

73-
match s.try_into::<Self>() {
74-
Ok(mut s) => {
75-
s.syncstorage.normalize();
76-
if s.worker_max_blocking_threads == 0 {
77-
// Db backends w/ blocking calls block via
78-
// actix-threadpool: grow its size to accommodate the
79-
// full number of connections
80-
let total_db_pool_size = {
81-
let syncstorage_pool_max_size =
82-
if s.syncstorage.uses_spanner() || !s.syncstorage.enabled {
83-
0
84-
} else {
85-
s.syncstorage.database_pool_max_size
86-
};
87-
88-
let tokenserver_pool_max_size = if s.tokenserver.enabled {
89-
s.tokenserver.database_pool_max_size
90-
} else {
91-
0
92-
};
93-
94-
syncstorage_pool_max_size + tokenserver_pool_max_size
95-
};
96-
97-
let fxa_threads = if s.tokenserver.enabled
98-
&& s.tokenserver.fxa_oauth_primary_jwk.is_none()
99-
&& s.tokenserver.fxa_oauth_secondary_jwk.is_none()
100-
{
101-
s.tokenserver
102-
.additional_blocking_threads_for_fxa_requests
103-
.ok_or_else(|| {
104-
println!(
105-
"If the Tokenserver OAuth JWK is not cached, additional blocking \
106-
threads must be used to handle the requests to FxA."
107-
);
108-
109-
let setting_name =
110-
"tokenserver.additional_blocking_threads_for_fxa_requests";
111-
ConfigError::NotFound(String::from(setting_name))
112-
})?
113-
} else {
114-
0
115-
};
116-
s.worker_max_blocking_threads =
117-
(total_db_pool_size + fxa_threads).max(num_cpus::get() as u32 * 5) as usize;
118-
}
119-
Ok(s)
120-
}
121-
// Configuration errors are not very sysop friendly, Try to make them
122-
// a bit more 3AM useful.
123-
Err(ConfigError::Message(v)) => {
73+
let mut s = s.try_into::<Self>().map_err(|e| match e {
74+
ConfigError::Message(v) => {
12475
println!("Bad configuration: {:?}", &v);
12576
println!("Please set in config file or use environment variable.");
12677
println!(
12778
"For example to set `database_url` use env var `{}_DATABASE_URL`\n",
12879
PREFIX.to_uppercase()
12980
);
13081
error!("Configuration error: Value undefined {:?}", &v);
131-
Err(ConfigError::NotFound(v))
82+
ConfigError::NotFound(v)
13283
}
133-
Err(e) => {
84+
e => {
13485
error!("Configuration error: Other: {:?}", &e);
135-
Err(e)
86+
e
87+
}
88+
})?;
89+
s.normalize()?;
90+
s.validate()?;
91+
Ok(s)
92+
}
93+
94+
/// Adjust values if required
95+
pub fn normalize(&mut self) -> Result<(), ConfigError> {
96+
self.syncstorage.normalize();
97+
98+
if self.worker_max_blocking_threads != 0 {
99+
return Ok(());
100+
}
101+
102+
// Db backends w/ blocking calls block via actix-threadpool: grow its
103+
// size to accommodate the full number of connections
104+
let total_db_pool_size = {
105+
let syncstorage_pool_max_size =
106+
if self.syncstorage.uses_spanner() || !self.syncstorage.enabled {
107+
0
108+
} else {
109+
self.syncstorage.database_pool_max_size
110+
};
111+
112+
let tokenserver_pool_max_size = if self.tokenserver.enabled {
113+
self.tokenserver.database_pool_max_size
114+
} else {
115+
0
116+
};
117+
118+
syncstorage_pool_max_size + tokenserver_pool_max_size
119+
};
120+
121+
let fxa_threads = if self.tokenserver.enabled
122+
&& self.tokenserver.fxa_oauth_primary_jwk.is_none()
123+
&& self.tokenserver.fxa_oauth_secondary_jwk.is_none()
124+
{
125+
self.tokenserver
126+
.additional_blocking_threads_for_fxa_requests
127+
.ok_or_else(|| {
128+
println!(
129+
"If the Tokenserver OAuth JWK is not cached, additional blocking \
130+
threads must be used to handle the requests to FxA."
131+
);
132+
133+
let setting_name = "tokenserver.additional_blocking_threads_for_fxa_requests";
134+
ConfigError::NotFound(String::from(setting_name))
135+
})?
136+
} else {
137+
0
138+
};
139+
self.worker_max_blocking_threads =
140+
(total_db_pool_size + fxa_threads).max(num_cpus::get() as u32 * 5) as usize;
141+
142+
Ok(())
143+
}
144+
145+
pub fn validate(&self) -> Result<(), ConfigError> {
146+
if let Some(init_node_url) = &self.tokenserver.init_node_url {
147+
let url = Url::parse(init_node_url).map_err(|e| {
148+
ConfigError::Message(format!("Invalid SYNC_TOKENSERVER__INIT_NODE_URL: {e}"))
149+
})?;
150+
if !["http", "https"].contains(&url.scheme()) {
151+
return Err(ConfigError::Message(
152+
"Invalid SYNC_TOKENSERVER__INIT_NODE_URL: \
153+
requires an \"https\"/\"http\" scheme"
154+
.to_owned(),
155+
));
136156
}
137157
}
158+
Ok(())
138159
}
139160

140161
#[cfg(debug_assertions)]

tokenserver-db-common/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ pub trait Db {
7070
async fn check(&mut self) -> DbResult<results::Check>;
7171

7272
/// Insert an initial Sync 1.5 node record. Does nothing when there is a conflict.
73-
async fn insert_sync15_node(&mut self, params: params::Sync15Node) -> DbResult<()>;
73+
/// Returns whether a node entry was added.
74+
async fn insert_sync15_node(&mut self, params: params::Sync15Node) -> DbResult<bool>;
7475

7576
/// Get Node ID based on service_id and node string.
7677
async fn get_node_id(&mut self, params: params::GetNodeId) -> DbResult<results::GetNodeId>;

tokenserver-db/src/mock.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,8 @@ impl Db for MockDb {
113113
Ok(results::GetServiceId::default())
114114
}
115115

116-
async fn insert_sync15_node(&mut self, _params: params::Sync15Node) -> Result<(), DbError> {
117-
Ok(())
116+
async fn insert_sync15_node(&mut self, _params: params::Sync15Node) -> Result<bool, DbError> {
117+
Ok(false)
118118
}
119119

120120
fn metrics(&self) -> &Metrics {

tokenserver-mysql/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ diesel.workspace = true
1313
diesel-async.workspace = true
1414
diesel_migrations.workspace = true
1515
http.workspace = true
16+
slog-scope.workspace = true
17+
1618
syncserver-common = { path = "../syncserver-common" }
1719
syncserver-db-common = { path = "../syncserver-db-common" }
1820
tokenserver-common = { path = "../tokenserver-common" }

tokenserver-mysql/src/db/db_impl.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -377,10 +377,10 @@ impl Db for TokenserverDb {
377377
Ok(result)
378378
}
379379

380-
async fn insert_sync15_node(&mut self, params: params::Sync15Node) -> DbResult<()> {
380+
async fn insert_sync15_node(&mut self, params: params::Sync15Node) -> DbResult<bool> {
381381
let query = format!(
382382
r#"
383-
INSERT INTO nodes (service, node, available, current_load, capacity, downed, backoff)
383+
INSERT IGNORE INTO nodes (service, node, available, current_load, capacity, downed, backoff)
384384
VALUES (
385385
(SELECT id FROM services WHERE service = '{}'),
386386
?, 1, 0, ?, 0, 0
@@ -389,13 +389,13 @@ impl Db for TokenserverDb {
389389
params::Sync15Node::SERVICE_NAME
390390
);
391391

392-
diesel::sql_query(query)
392+
let affected_rows = diesel::sql_query(query)
393393
.bind::<Text, _>(&params.node)
394394
.bind::<Integer, _>(params.capacity)
395395
.execute(&mut self.conn)
396396
.await?;
397397

398-
Ok(())
398+
Ok(affected_rows == 1)
399399
}
400400

401401
#[cfg(debug_assertions)]

tokenserver-mysql/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
#[macro_use]
2+
extern crate slog_scope;
3+
14
mod db;
25
mod pool;
36

tokenserver-mysql/src/pool.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -122,14 +122,17 @@ impl TokenserverPool {
122122

123123
/// Bootstrap the initial Sync 1.5 node record if init_node_url is set.
124124
async fn init_sync15_node(&mut self, node_url: String, capacity: i32) -> Result<(), DbError> {
125-
let _ = self
125+
let node_added = self
126126
.get()
127127
.await?
128128
.insert_sync15_node(params::Sync15Node {
129-
node: node_url,
129+
node: node_url.clone(),
130130
capacity,
131131
})
132-
.await;
132+
.await?;
133+
if node_added {
134+
info!("Initialized syncstorage node entry, node: {node_url:?} capacity: {capacity}");
135+
}
133136
Ok(())
134137
}
135138
}
@@ -153,9 +156,8 @@ impl DbPool for TokenserverPool {
153156

154157
// Init the Sync 1.5 node record if init_node_url is set
155158
if let Some(node_url) = self.init_node_url.clone() {
156-
let _ = self
157-
.init_sync15_node(node_url, self.init_node_capacity)
158-
.await;
159+
self.init_sync15_node(node_url, self.init_node_capacity)
160+
.await?;
159161
}
160162

161163
Ok(())

tokenserver-postgres/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ diesel = { workspace = true, features = ["postgres"] }
1212
diesel-async = { workspace = true, features = ["postgres"] }
1313
diesel_migrations.workspace = true
1414
http.workspace = true
15+
slog-scope.workspace = true
1516

1617
syncserver-common = { path = "../syncserver-common" }
1718
syncserver-db-common = { path = "../syncserver-db-common" }

tokenserver-postgres/src/db/db_impl.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ impl Db for TokenserverPgDb {
6767
// Nodes Table Methods
6868

6969
/// Upsert the initial node record for Sync 1.5.
70-
async fn insert_sync15_node(&mut self, params: params::Sync15Node) -> DbResult<()> {
70+
async fn insert_sync15_node(&mut self, params: params::Sync15Node) -> DbResult<bool> {
7171
let query = format!(
7272
r#"
7373
INSERT INTO nodes (service, node, available, current_load, capacity, downed, backoff)
@@ -80,13 +80,13 @@ impl Db for TokenserverPgDb {
8080
params::Sync15Node::SERVICE_NAME
8181
);
8282

83-
diesel::sql_query(query)
83+
let affected_rows = diesel::sql_query(query)
8484
.bind::<Text, _>(&params.node)
8585
.bind::<Integer, _>(params.capacity)
8686
.execute(&mut self.conn)
8787
.await?;
8888

89-
Ok(())
89+
Ok(affected_rows == 1)
9090
}
9191

9292
/// Get Node with complete metadata, given a provided Node ID.

0 commit comments

Comments
 (0)