Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 61 additions & 9 deletions glean-core/src/database/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

use std::fmt::{self, Display};
use std::fs;
use std::num::NonZeroU64;
use std::path::Path;
Expand All @@ -23,7 +24,6 @@ use crate::common_metric_data::CommonMetricDataInternal;
use crate::database::migration::{self, MigrationState};
use crate::metrics::dual_labeled_counter::RECORD_SEPARATOR;
use crate::metrics::Metric;
use crate::Error;
use crate::Glean;
use crate::Lifetime;
use crate::Result;
Expand All @@ -34,7 +34,7 @@ mod schema;
#[derive(Debug)]
pub enum LoadState {
Ok,
Err(Error),
Err(OpenError),
}

#[derive(Debug, PartialEq, Eq, Copy, Clone)]
Expand Down Expand Up @@ -102,7 +102,54 @@ fn database_size(dir: &Path) -> Option<NonZeroU64> {
NonZeroU64::new(total_size)
}

pub fn sqlite_open(path: &Path) -> std::result::Result<(Connection, LoadState), Error> {
#[derive(Debug)]
pub enum OpenError {
IncompatibleVersion(u32),
Corrupt,
SqlError(rusqlite::Error),
RecoveryError(std::io::Error),
}

impl std::error::Error for OpenError {}

impl Display for OpenError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
use OpenError::*;
match self {
IncompatibleVersion(v) => write!(f, "Incompatible database version: {v}"),
Corrupt => write!(f, "Database is corrupt"),
SqlError(err) => write!(f, "Error executing SQL: {err}"),
RecoveryError(err) => write!(
f,
"Failed to recover a corrupt database due to an error deleting the file: {err}"
),
}
}
}

impl From<rusqlite::Error> for OpenError {
fn from(value: rusqlite::Error) -> Self {
match value {
rusqlite::Error::SqliteFailure(e, _)
if matches!(e.code, ErrorCode::DatabaseCorrupt | ErrorCode::NotADatabase) =>
{
Self::Corrupt
}
_ => Self::SqlError(value),
}
}
}

impl From<SchemaError> for OpenError {
fn from(value: SchemaError) -> Self {
match value {
SchemaError::Sqlite(err) => OpenError::SqlError(err),
SchemaError::UnsupportedSchemaVersion(v) => OpenError::IncompatibleVersion(v),
}
}
}

pub fn sqlite_open(path: &Path) -> std::result::Result<(Connection, LoadState), OpenError> {
// TODO(bug 2049292): Make this more robust, use the correct errors and see how we can test all the branches
// properly.
match Connection::new::<Schema>(path) {
Expand All @@ -112,24 +159,24 @@ pub fn sqlite_open(path: &Path) -> std::result::Result<(Connection, LoadState),
ErrorCode::PermissionDenied => Err(e.into()),
ErrorCode::NotADatabase => {
log::debug!("sqlite failed: not a database. starting from scratch.");
fs::remove_file(path).map_err(|_| rkv::StoreError::FileInvalid)?;
fs::remove_file(path).map_err(OpenError::RecoveryError)?;
// Now try again, we only handle that error once.
let conn = Connection::new::<Schema>(path)?;
Ok((conn, LoadState::Err(e.into())))
Ok((conn, LoadState::Err(OpenError::Corrupt)))
}
ErrorCode::CannotOpen => {
log::debug!("sqlite failed: cannot open. starting from scratch.");
fs::remove_file(path).map_err(|_| rkv::StoreError::FileInvalid)?;
fs::remove_file(path).map_err(OpenError::RecoveryError)?;
// Now try again, we only handle that error once.
let conn = Connection::new::<Schema>(path)?;
Ok((conn, LoadState::Err(e.into())))
Ok((conn, LoadState::Err(OpenError::Corrupt)))
}
_ => Err(e.into()),
}
}
Err(err @ SchemaError::Sqlite(SqlError::SqlInputError { .. })) => {
log::debug!("sqlite failed: schema migration failed. starting from scratch.");
fs::remove_file(path).map_err(|_| rkv::StoreError::FileInvalid)?;
fs::remove_file(path).map_err(OpenError::RecoveryError)?;
// Now try again, we only handle that error once.
let conn = Connection::new::<Schema>(path)?;
Ok((conn, LoadState::Err(err.into())))
Expand Down Expand Up @@ -198,7 +245,12 @@ impl Database {
/// Get the load state.
pub fn load_state(&self) -> Option<String> {
if let LoadState::Err(e) = &self.load_state {
Some(e.to_string())
Some(match e {
OpenError::IncompatibleVersion(v) => format!("incompatible version: {v}"),
OpenError::Corrupt => "database file corrupt".to_string(),
OpenError::SqlError(error) => format!("sql error: {error:?}"),
OpenError::RecoveryError(error) => format!("recovery error: {error:?}"),
})
} else {
None
}
Expand Down
8 changes: 8 additions & 0 deletions glean-core/src/database/sqlite/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ pub trait ConnectionOpener {
/// Upgrades an existing physical database to the schema with
/// the given version.
fn upgrade(tx: &mut Transaction<'_>, to_version: NonZeroU32) -> Result<(), Self::Error>;

/// Validate that the database is usable.
///
/// Called after `create` / `upgrade`.
fn validate(_tx: &mut Transaction<'_>) -> Result<(), Self::Error> {
Ok(())
}
}

/// A thread-safe wrapper around a connection to a physical SQLite database.
Expand Down Expand Up @@ -72,6 +79,7 @@ impl Connection {
// so that upgrading to it again in the future can fix up any
// invariants that our version might not uphold.
tx.execute_batch(&format!("PRAGMA user_version = {}", O::MAX_SCHEMA_VERSION))?;
O::validate(&mut tx)?;
tx.commit()?;
Ok(Self::with_connection(conn))
}
Expand Down
7 changes: 7 additions & 0 deletions glean-core/src/database/sqlite/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ impl ConnectionOpener for Schema {
PRAGMA temp_store = MEMORY;
-- allows adding incremental cleanup later
PRAGMA auto_vacuum = INCREMENTAL;
PRAGMA busy_timeout = 0;
",
)?;

Expand Down Expand Up @@ -65,6 +66,12 @@ impl ConnectionOpener for Schema {
fn upgrade(_: &mut Transaction<'_>, to_version: NonZeroU32) -> Result<(), Self::Error> {
Err(SchemaError::UnsupportedSchemaVersion(to_version.get()))
}

fn validate(tx: &mut Transaction<'_>) -> Result<(), Self::Error> {
// A query selecting every field, it doesn't need to return anything.
tx.execute_batch("SELECT id, ping, lifetime, labels, value FROM telemetry WHERE 1 = 0")?;
Ok(())
}
}

#[derive(thiserror::Error, Debug)]
Expand Down
17 changes: 10 additions & 7 deletions glean-core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::result;

use rkv::StoreError;

use crate::database::sqlite::SchemaError;
use crate::database::sqlite::{OpenError, SchemaError};

/// A specialized [`Result`] type for this crate's operations.
///
Expand Down Expand Up @@ -173,15 +173,18 @@ impl From<rusqlite::Error> for Error {
}
}

impl From<SchemaError> for Error {
fn from(error: SchemaError) -> Error {
impl From<OpenError> for Error {
fn from(error: OpenError) -> Error {
match error {
SchemaError::Sqlite(err) => Error {
kind: ErrorKind::SQLite(err),
OpenError::IncompatibleVersion(v) => Error {
kind: ErrorKind::Schema(SchemaError::UnsupportedSchemaVersion(v)),
},
err => Error {
kind: ErrorKind::Schema(err),
// TODO
OpenError::Corrupt => Error {
kind: ErrorKind::NotInitialized,
},
OpenError::SqlError(error) => error.into(),
OpenError::RecoveryError(error) => error.into(),
}
}
}
Expand Down
61 changes: 43 additions & 18 deletions glean-core/tests/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use crate::common::*;

use glean_core::metrics::*;
use glean_core::CommonMetricData;
use glean_core::Glean;
use glean_core::Lifetime;
use glean_core::SessionMode;
use rusqlite::params;
use rusqlite::TransactionBehavior;
use uuid::uuid;
Expand Down Expand Up @@ -52,6 +54,9 @@ fn database_file_is_not_sqlite() {

let client_id = clientid_metric().get_value(&glean, None);
assert!(client_id.is_some());

let load_error = load_error_metric().get_value(&glean, None).unwrap();
assert_eq!("database file corrupt", load_error);
}

#[test]
Expand All @@ -75,12 +80,11 @@ fn database_contains_wrong_table() {
let client_id = clientid_metric().get_value(&glean, None);
assert!(client_id.is_some());

let load_error = load_error_metric().get_value(&glean, None);
assert!(load_error.is_some());
let load_error = load_error_metric().get_value(&glean, None).unwrap();
assert!(load_error.starts_with("sql error:"));
}

#[test]
#[ignore]
fn database_contains_correct_user_version_but_wrong_table() {
let temp = {
let (glean, temp) = new_glean(None);
Expand All @@ -100,8 +104,8 @@ fn database_contains_correct_user_version_but_wrong_table() {
let client_id = clientid_metric().get_value(&glean, None);
assert!(client_id.is_some());

let load_error = load_error_metric().get_value(&glean, None);
assert!(load_error.is_some());
let load_error = load_error_metric().get_value(&glean, None).unwrap();
assert!(load_error.starts_with("sql error:"));
}

#[test]
Expand Down Expand Up @@ -153,13 +157,14 @@ fn higher_user_version_upgrade_does_not_crash() {

let client_id = clientid_metric().get_value(&glean, None).unwrap();
assert_eq!(first_client_id, client_id);

let load_error = load_error_metric().get_value(&glean, None);
assert!(load_error.is_none());
}

// Permissions only really work on Unix systems, definitely not on Windows
#[cfg(unix)]
mod unix {
use glean_core::Glean;

use super::*;

#[test]
Expand Down Expand Up @@ -205,28 +210,48 @@ mod unix {
}
}

// TODO(bug 2049295):
// This currently fails.
// The database is locked, so Glean can't access it.
// It's unclear how we should handle that.
// It's not a particular likely case to happen in practice.
#[test]
#[ignore]
fn database_externally_locked() {
// This is NOT the usual case.
// But if the database is already locked, there's little we can do.
// This behavior is the same as if we don't have permissions to access the database file.

let temp = {
let (glean, temp) = new_glean(None);
drop(glean);
temp
};

let path = temp.path().join("db").join("glean.sqlite");
let mut conn = rusqlite::Connection::open(path).unwrap();
let mut conn = rusqlite::Connection::open(&path).unwrap();
let _tx = conn
.transaction_with_behavior(TransactionBehavior::Immediate)
.unwrap();

let (glean, _temp) = new_glean(Some(temp));

let client_id = clientid_metric().get_value(&glean, None);
assert!(client_id.is_some());
let cfg = glean_core::InternalConfiguration {
data_path: path.display().to_string(),
application_id: GLOBAL_APPLICATION_ID.into(),
language_binding_name: "Rust".into(),
upload_enabled: true,
max_events: None,
delay_ping_lifetime_io: false,
app_build: "Unknown".into(),
use_core_mps: false,
trim_data_to_registered_pings: false,
log_level: None,
rate_limit: None,
enable_event_timestamps: false,
experimentation_id: None,
enable_internal_pings: true,
ping_schedule: Default::default(),
ping_lifetime_threshold: 0,
ping_lifetime_max_time: 0,
max_pending_pings_count: None,
max_pending_pings_directory_size: None,
session_inactivity_timeout_ms: 0,
session_mode: SessionMode::Auto,
session_sample_rate: 1.0,
};
let glean = Glean::new(cfg);
assert!(glean.is_err());
}