niamoto.common package¶
Subpackages¶
Submodules¶
niamoto.common.config module¶
- class niamoto.common.config.Config(config_dir=None, create_default=True)¶
Bases:
object- Class to manage all Niamoto configuration files:
config.yml (global env settings: database, logs, outputs, etc.)
import.yml (data sources)
transform.yml (transformations)
export.yml (widgets)
- Parameters:
config_dir (str | None)
create_default (bool)
- classmethod clear_cache()¶
Clear in-process config caches. Mainly useful for tests.
- Return type:
None
- static get_niamoto_home()¶
Return the Niamoto home directory.
This method checks if the ‘NIAMOTO_HOME’ environment variable is set. If it is, returns that path; otherwise, falls back to the current working directory.
- Return type:
str
- property database_path: str¶
Get the database path from config.yml. :returns: database path (resolved to absolute path) :rtype: str
- property logs_path: str¶
Get the logs path from config.yml. :returns: logs path :rtype: str
- property get_export_config: Dict[str, str]¶
Get the output paths from config.yml. :returns: output paths :rtype: Dict[str, str]
- property get_deploy_config: Dict[str, Any]¶
Get the deploy configuration from deploy.yml (optional).
- Returns:
platform, project_name, branch, extra. Empty dict if deploy.yml does not exist.
- Return type:
Dict with keys
- property get_imports_config: GenericImportConfig¶
Get the generic import configuration from import.yml.
- Returns:
Typed import configuration with entities.references and entities.datasets
- Return type:
GenericImportConfig
- Raises:
ConfigurationError – If import.yml is missing or doesn’t follow the entities schema
- get_transforms_config()¶
Get the transformations config from transform.yml. :returns: transformations config :rtype: List[Dict[str, Any]]
- Return type:
List[Dict[str, Any]]
- get_exports_config()¶
Get the transforms config from export.yml. :returns: transforms config :rtype: List[Dict[str, Any]]
- Return type:
List[Dict[str, Any]]
niamoto.common.database module¶
This module provides a Database class for connecting to and interacting with a database.
The Database class offers methods to establish a connection, get new sessions, add instances to the database, and close sessions.
- class niamoto.common.database.Database(db_path, optimize=True, engine=None, read_only=False)¶
Bases:
objectA class that provides a connection to a database and offers methods to interact with it.
- Parameters:
db_path (str)
optimize (bool)
engine (Engine | None)
read_only (bool)
- enable_connection_reuse()¶
Enable per-thread connection reuse for hot read/write loops.
- Return type:
None
- disable_connection_reuse()¶
Disable and close any reused connection for the current thread.
- Return type:
None
- connection()¶
Yield a SQLAlchemy connection, optionally reused within the current thread.
- Return type:
Iterator[Connection]
- create_indexes_for_table(table_name, index_columns=None)¶
Create indexes on a dynamically created table.
This method is designed for tables created during import/transform phases where table names are defined by user configuration.
- Parameters:
table_name (str) – Name of the dynamically created table
index_columns (List[str]) – List of columns to index. If None, will auto-detect: - Columns ending with _id (likely foreign keys) - Common query columns (id, type, name, etc.)
- Return type:
None
Example
# After creating ‘occurrences’ table from CSV db.create_indexes_for_table(‘occurrences’)
# After transform creates ‘taxon_stats’ table db.create_indexes_for_table(‘taxon_stats’, [‘taxon_id’, ‘year’])
- optimize_all_tables()¶
Optimize all tables in the database by creating missing indexes. Useful after import/transform operations that create new tables.
- Return type:
None
- has_table(table_name)¶
Check if a table exists in the database.
- Parameters:
table_name (str) – The name of the table to check.
- Returns:
True if the table exists, False otherwise.
- Return type:
bool
- invalidate_table_names_cache()¶
Invalidate cached table names (call after DDL operations).
- Return type:
None
- get_table_names(max_age_seconds=2.0)¶
Return table names using the backend-safe cache path.
- Parameters:
max_age_seconds (float)
- Return type:
List[str]
- get_new_session()¶
Get a new session from the session factory.
- Returns:
A new session.
- Return type:
scoped_session[Session]
- add_instance_and_commit(instance)¶
Add an instance to the session and commit.
- Parameters:
instance (Any) – The instance to be added.
- Return type:
None
- execute_select(sql)¶
Execute a SELECT query using the database engine.
- Parameters:
sql (str) – A string containing the SELECT query to be executed.
- Returns:
The result of the query if successful, None otherwise.
- Return type:
Optional[Any]
Note
For DuckDB, this method disposes of the connection pool before executing queries to ensure we see the latest committed data. This is necessary because DuckDB uses in-process connections that may cache database state.
- execute_sql(sql, params=None, fetch=False, *, fetch_all=False)¶
Execute a raw SQL query using the database engine.
- Parameters:
sql (str) – A string containing the SQL query to be executed.
params (Dict[str, Any], optional) – Parameters to bind to the query.
fetch (bool) – Whether to fetch one row from the result set if the query returns data.
fetch_all (bool)
- Returns:
The result of the query if fetch is True and a result is available, or None otherwise.
- Return type:
Optional[Any]
- commit_session()¶
Commit the current session to the database.
- Return type:
None
- rollback_session()¶
Rollback the current session, discarding any uncommitted changes.
- Return type:
None
- close_db_session()¶
Close the database session.
- Return type:
None
- begin_transaction()¶
Begin a new transaction.
- Return type:
None
- commit_transaction()¶
Commit the current transaction.
- Return type:
None
- rollback_transaction()¶
Rollback the current transaction.
- Return type:
None
- get_table_columns(table_name)¶
Return column names for the given table.
- Parameters:
table_name (str)
- Return type:
List[str]
- get_columns(table_name)¶
Return SQLAlchemy inspector-like column metadata for a table.
- Parameters:
table_name (str)
- Return type:
List[Dict[str, Any]]
- get_table_schema(table_name)¶
Return table schema as list of {name, type} dictionaries.
- Parameters:
table_name (str)
- Return type:
List[Dict[str, Any]]
- optimize_database()¶
Run database optimization routines.
This method can be called periodically to maintain optimal performance: - For SQLite: Applies PRAGMA optimizations, ANALYZE, and VACUUM - For DuckDB: Runs CHECKPOINT and VACUUM to compact the database - Creates any missing indexes (SQLite only, DuckDB doesn’t support index reflection)
- Return type:
None
- get_database_stats()¶
Get database statistics and performance metrics.
- Returns:
Dict containing database size, page count, cache statistics, etc.
- Return type:
Dict[str, Any]
- fetch_all(query, params=None)¶
Executes a raw SQL query and returns all results as a list of dictionaries.
- Parameters:
query (str)
params (Dict[str, Any] | None)
- Return type:
List[Dict[str, Any]]
- fetch_one(query, params=None)¶
Executes a raw SQL query and returns the first result as a dictionary, or None if no result.
- Parameters:
query (str)
params (Dict[str, Any] | None)
- Return type:
Dict[str, Any] | None
- execute_query(query, params=None)¶
Executes a given SQL query using the current session.
- Parameters:
query (str)
params (dict)
- Return type:
Any
niamoto.common.environment module¶
- class niamoto.common.environment.Environment(config_dir, project_name=None)¶
Bases:
objectA class used to manage the environment for the Niamoto project.
- Parameters:
config_dir (str)
project_name (str)
- initialize()¶
Initialize the environment based on configuration.
- Return type:
None
- reset()¶
Reset environment by deleting DB, clearing exports (except ‘files’ directory) and logs.
- Return type:
None
niamoto.common.exceptions module¶
Custom exceptions for the Niamoto application.
- exception niamoto.common.exceptions.NiamotoError(message, details=None)¶
Bases:
ExceptionBase exception class for all Niamoto errors.
- Parameters:
message (str)
details (dict | None)
- get_user_message()¶
Returns a user-friendly error message
- Return type:
str
- exception niamoto.common.exceptions.ConfigurationError(config_key, message, details=None)¶
Bases:
NiamotoErrorException raised for configuration errors.
- Parameters:
config_key (str)
message (str)
details (dict | None)
- get_user_message()¶
Returns a user-friendly error message with details if available
- Return type:
str
- exception niamoto.common.exceptions.EnvironmentSetupError(message, details=None)¶
Bases:
NiamotoErrorException raised for environment setup and configuration issues.
- Parameters:
message (str)
details (dict | None)
- exception niamoto.common.exceptions.LoggingError(message, details=None)¶
Bases:
NiamotoErrorException raised for logging configuration and handling errors.
- Parameters:
message (str)
details (dict | None)
- exception niamoto.common.exceptions.CLIError(message, details=None)¶
Bases:
NiamotoErrorBase class for CLI related errors.
- Parameters:
message (str)
details (dict | None)
- exception niamoto.common.exceptions.CommandError(command, message, details=None)¶
Bases:
CLIErrorRaised when a command fails to execute.
- Parameters:
command (str)
message (str)
details (dict | None)
- exception niamoto.common.exceptions.ArgumentError(argument, message, details=None)¶
Bases:
CLIErrorRaised when command arguments are invalid.
- Parameters:
argument (str)
message (str)
details (dict | None)
- exception niamoto.common.exceptions.VersionError(message, details=None)¶
Bases:
CLIErrorRaised when version information cannot be retrieved.
- Parameters:
message (str)
details (dict | None)
- exception niamoto.common.exceptions.InputError(message, details=None)¶
Bases:
NiamotoErrorException raised for errors in input data.
- Parameters:
message (str)
details (dict | None)
- exception niamoto.common.exceptions.ValidationError(field, message, details=None)¶
Bases:
NiamotoErrorException raised for validation errors.
- Parameters:
field (str)
message (str)
details (dict | None)
- exception niamoto.common.exceptions.DataValidationError(message, validation_errors)¶
Bases:
NiamotoErrorException raised for data validation errors.
- Parameters:
message (str)
validation_errors (list[dict[str, Any]])
- exception niamoto.common.exceptions.DataLoadError(message, details=None)¶
Bases:
NiamotoErrorRaised when there is an error loading data.
- Parameters:
message (str)
details (dict | None)
- exception niamoto.common.exceptions.FileError(file_path, message, details=None)¶
Bases:
NiamotoErrorBase class for file related errors.
- Parameters:
file_path (str)
message (str)
details (dict | None)
- exception niamoto.common.exceptions.FileReadError(file_path, message, details=None)¶
Bases:
FileErrorException raised when there is an error reading a file.
- Parameters:
file_path (str)
message (str)
details (dict | None)
- exception niamoto.common.exceptions.FileWriteError(file_path, message, details=None)¶
Bases:
FileErrorException raised when there is an error writing a file.
- Parameters:
file_path (str)
message (str)
details (dict | None)
- exception niamoto.common.exceptions.FileFormatError(file_path, message, details=None)¶
Bases:
FileErrorException raised when file format is invalid.
- Parameters:
file_path (str)
message (str)
details (dict | None)
- exception niamoto.common.exceptions.CSVError(file_path, message, details=None)¶
Bases:
FileErrorException raised for CSV-specific errors.
- Parameters:
file_path (str)
message (str)
details (dict | None)
- exception niamoto.common.exceptions.DatabaseError(message, details=None)¶
Bases:
NiamotoErrorBase class for database related errors.
- Parameters:
message (str)
details (dict | None)
- exception niamoto.common.exceptions.DatabaseConnectionError(message, details=None)¶
Bases:
DatabaseErrorException raised when database connection fails.
- Parameters:
message (str)
details (dict | None)
- exception niamoto.common.exceptions.DatabaseWriteError(table_name, message, details=None)¶
Bases:
DatabaseErrorException raised when writing to database fails.
- Parameters:
table_name (str)
message (str)
details (dict | None)
- exception niamoto.common.exceptions.DatabaseQueryError(query, message, details=None)¶
Bases:
DatabaseErrorException raised when a database query fails.
- Parameters:
query (str)
message (str)
details (dict | None)
- get_user_message()¶
Returns a detailed user-friendly error message
- Return type:
str
- exception niamoto.common.exceptions.TransactionError(message, details=None)¶
Bases:
DatabaseErrorException raised for transaction-related errors.
- Parameters:
message (str)
details (dict | None)
- exception niamoto.common.exceptions.DatabaseLockError(message, query=None, details=None)¶
Bases:
DatabaseQueryErrorException raised when the database is locked by another process.
- Parameters:
message (str)
query (str | None)
details (dict | None)
- Return type:
None
- get_user_message()¶
Return friendlier message without query context when absent.
- Return type:
str
- exception niamoto.common.exceptions.DataImportError(message, details=None)¶
Bases:
NiamotoErrorBase class for import related errors.
- Parameters:
message (str)
details (dict | None)
- exception niamoto.common.exceptions.TaxonomyImportError(message, details=None)¶
Bases:
DataImportErrorException raised for taxonomy import errors.
- Parameters:
message (str)
details (dict | None)
- exception niamoto.common.exceptions.OccurrenceImportError(message, details=None)¶
Bases:
DataImportErrorException raised for occurrence import errors.
- Parameters:
message (str)
details (dict | None)
- exception niamoto.common.exceptions.PlotImportError(message, details=None)¶
Bases:
DataImportErrorException raised for plot import errors.
- Parameters:
message (str)
details (dict | None)
- exception niamoto.common.exceptions.ShapeImportError(message, details=None)¶
Bases:
DataImportErrorException raised for shape import errors.
- Parameters:
message (str)
details (dict | None)
- exception niamoto.common.exceptions.ProcessError(message, details=None)¶
Bases:
NiamotoErrorBase class for transforms calculation errors.
- Parameters:
message (str)
details (dict | None)
- get_user_message()¶
Returns a user-friendly error message with details if available
- Return type:
str
- exception niamoto.common.exceptions.CalculationError(message, details=None)¶
Bases:
ProcessErrorException raised for mathematical calculation errors.
- Parameters:
message (str)
details (dict | None)
- exception niamoto.common.exceptions.DataTransformError(message, details=None)¶
Bases:
ProcessErrorException raised for errors during data processing for transforms.
- Parameters:
message (str)
details (dict | None)
- exception niamoto.common.exceptions.JSONEncodeError(message, details=None)¶
Bases:
DataTransformErrorException raised for JSON encoding errors during data transformation.
- Parameters:
message (str)
details (dict | None)
- exception niamoto.common.exceptions.GenerationError(message, details=None)¶
Bases:
NiamotoErrorBase class for content generation errors.
- Parameters:
message (str)
details (dict | None)
- exception niamoto.common.exceptions.TemplateError(template_name, message, details=None)¶
Bases:
GenerationErrorException raised for template processing errors.
- Parameters:
template_name (str)
message (str)
details (dict | None)
- exception niamoto.common.exceptions.OutputError(output_path, message, details=None)¶
Bases:
GenerationErrorException raised for file generation and output errors.
- Parameters:
output_path (str)
message (str)
details (dict | None)
niamoto.common.paths module¶
niamoto.common.progress module¶
Progress tracking utilities that can work with or without Rich.
- class niamoto.common.progress.ProgressTracker(use_progress_bar=True)¶
Bases:
objectA progress tracker that can work with or without Rich Progress bars.
- Parameters:
use_progress_bar (bool)
- track(description, total=None)¶
Context manager for tracking progress.
- Parameters:
description (str) – Description of the task
total (int | None) – Total number of items to process
- Yields:
A progress updater function
- status(message)¶
Context manager for showing a status message.
- Parameters:
message (str) – Status message to display
- niamoto.common.progress.get_progress_tracker()¶
Get the global progress tracker instance.
- Return type:
- niamoto.common.progress.set_progress_mode(use_progress_bar)¶
Set whether to use progress bars globally.
- Parameters:
use_progress_bar (bool) – True to use Rich progress bars, False to use logging