niamoto.common package

Subpackages

Submodules

niamoto.common.config module

class niamoto.common.config.Config(config_dir=None, create_default=True)

Bases: object

Class to manage all Niamoto configuration files:
  • config.yml (global env settings: database, logs, outputs, etc.)

  • import.yml (data sources)

  • transform.yml (transformations)

  • export.yml (widgets)

Parameters:
  • config_dir (str | None)

  • create_default (bool)

classmethod clear_cache()

Clear in-process config caches. Mainly useful for tests.

Return type:

None

static get_niamoto_home()

Return the Niamoto home directory.

This method checks if the ‘NIAMOTO_HOME’ environment variable is set. If it is, returns that path; otherwise, falls back to the current working directory.

Return type:

str

property database_path: str

Get the database path from config.yml. :returns: database path (resolved to absolute path) :rtype: str

property logs_path: str

Get the logs path from config.yml. :returns: logs path :rtype: str

property get_export_config: Dict[str, str]

Get the output paths from config.yml. :returns: output paths :rtype: Dict[str, str]

property get_deploy_config: Dict[str, Any]

Get the deploy configuration from deploy.yml (optional).

Returns:

platform, project_name, branch, extra. Empty dict if deploy.yml does not exist.

Return type:

Dict with keys

property get_imports_config: GenericImportConfig

Get the generic import configuration from import.yml.

Returns:

Typed import configuration with entities.references and entities.datasets

Return type:

GenericImportConfig

Raises:

ConfigurationError – If import.yml is missing or doesn’t follow the entities schema

get_transforms_config()

Get the transformations config from transform.yml. :returns: transformations config :rtype: List[Dict[str, Any]]

Return type:

List[Dict[str, Any]]

get_exports_config()

Get the transforms config from export.yml. :returns: transforms config :rtype: List[Dict[str, Any]]

Return type:

List[Dict[str, Any]]

niamoto.common.database module

This module provides a Database class for connecting to and interacting with a database.

The Database class offers methods to establish a connection, get new sessions, add instances to the database, and close sessions.

class niamoto.common.database.Database(db_path, optimize=True, engine=None, read_only=False)

Bases: object

A class that provides a connection to a database and offers methods to interact with it.

Parameters:
  • db_path (str)

  • optimize (bool)

  • engine (Engine | None)

  • read_only (bool)

enable_connection_reuse()

Enable per-thread connection reuse for hot read/write loops.

Return type:

None

disable_connection_reuse()

Disable and close any reused connection for the current thread.

Return type:

None

connection()

Yield a SQLAlchemy connection, optionally reused within the current thread.

Return type:

Iterator[Connection]

create_indexes_for_table(table_name, index_columns=None)

Create indexes on a dynamically created table.

This method is designed for tables created during import/transform phases where table names are defined by user configuration.

Parameters:
  • table_name (str) – Name of the dynamically created table

  • index_columns (List[str]) – List of columns to index. If None, will auto-detect: - Columns ending with _id (likely foreign keys) - Common query columns (id, type, name, etc.)

Return type:

None

Example

# After creating ‘occurrences’ table from CSV db.create_indexes_for_table(‘occurrences’)

# After transform creates ‘taxon_stats’ table db.create_indexes_for_table(‘taxon_stats’, [‘taxon_id’, ‘year’])

optimize_all_tables()

Optimize all tables in the database by creating missing indexes. Useful after import/transform operations that create new tables.

Return type:

None

has_table(table_name)

Check if a table exists in the database.

Parameters:

table_name (str) – The name of the table to check.

Returns:

True if the table exists, False otherwise.

Return type:

bool

invalidate_table_names_cache()

Invalidate cached table names (call after DDL operations).

Return type:

None

get_table_names(max_age_seconds=2.0)

Return table names using the backend-safe cache path.

Parameters:

max_age_seconds (float)

Return type:

List[str]

get_new_session()

Get a new session from the session factory.

Returns:

A new session.

Return type:

scoped_session[Session]

add_instance_and_commit(instance)

Add an instance to the session and commit.

Parameters:

instance (Any) – The instance to be added.

Return type:

None

execute_select(sql)

Execute a SELECT query using the database engine.

Parameters:

sql (str) – A string containing the SELECT query to be executed.

Returns:

The result of the query if successful, None otherwise.

Return type:

Optional[Any]

Note

For DuckDB, this method disposes of the connection pool before executing queries to ensure we see the latest committed data. This is necessary because DuckDB uses in-process connections that may cache database state.

execute_sql(sql, params=None, fetch=False, *, fetch_all=False)

Execute a raw SQL query using the database engine.

Parameters:
  • sql (str) – A string containing the SQL query to be executed.

  • params (Dict[str, Any], optional) – Parameters to bind to the query.

  • fetch (bool) – Whether to fetch one row from the result set if the query returns data.

  • fetch_all (bool)

Returns:

The result of the query if fetch is True and a result is available, or None otherwise.

Return type:

Optional[Any]

commit_session()

Commit the current session to the database.

Return type:

None

rollback_session()

Rollback the current session, discarding any uncommitted changes.

Return type:

None

close_db_session()

Close the database session.

Return type:

None

begin_transaction()

Begin a new transaction.

Return type:

None

commit_transaction()

Commit the current transaction.

Return type:

None

rollback_transaction()

Rollback the current transaction.

Return type:

None

get_table_columns(table_name)

Return column names for the given table.

Parameters:

table_name (str)

Return type:

List[str]

get_columns(table_name)

Return SQLAlchemy inspector-like column metadata for a table.

Parameters:

table_name (str)

Return type:

List[Dict[str, Any]]

get_table_schema(table_name)

Return table schema as list of {name, type} dictionaries.

Parameters:

table_name (str)

Return type:

List[Dict[str, Any]]

optimize_database()

Run database optimization routines.

This method can be called periodically to maintain optimal performance: - For SQLite: Applies PRAGMA optimizations, ANALYZE, and VACUUM - For DuckDB: Runs CHECKPOINT and VACUUM to compact the database - Creates any missing indexes (SQLite only, DuckDB doesn’t support index reflection)

Return type:

None

get_database_stats()

Get database statistics and performance metrics.

Returns:

Dict containing database size, page count, cache statistics, etc.

Return type:

Dict[str, Any]

fetch_all(query, params=None)

Executes a raw SQL query and returns all results as a list of dictionaries.

Parameters:
  • query (str)

  • params (Dict[str, Any] | None)

Return type:

List[Dict[str, Any]]

fetch_one(query, params=None)

Executes a raw SQL query and returns the first result as a dictionary, or None if no result.

Parameters:
  • query (str)

  • params (Dict[str, Any] | None)

Return type:

Dict[str, Any] | None

execute_query(query, params=None)

Executes a given SQL query using the current session.

Parameters:
  • query (str)

  • params (dict)

Return type:

Any

niamoto.common.environment module

class niamoto.common.environment.Environment(config_dir, project_name=None)

Bases: object

A class used to manage the environment for the Niamoto project.

Parameters:
  • config_dir (str)

  • project_name (str)

initialize()

Initialize the environment based on configuration.

Return type:

None

reset()

Reset environment by deleting DB, clearing exports (except ‘files’ directory) and logs.

Return type:

None

niamoto.common.exceptions module

Custom exceptions for the Niamoto application.

exception niamoto.common.exceptions.NiamotoError(message, details=None)

Bases: Exception

Base exception class for all Niamoto errors.

Parameters:
  • message (str)

  • details (dict | None)

get_user_message()

Returns a user-friendly error message

Return type:

str

exception niamoto.common.exceptions.ConfigurationError(config_key, message, details=None)

Bases: NiamotoError

Exception raised for configuration errors.

Parameters:
  • config_key (str)

  • message (str)

  • details (dict | None)

get_user_message()

Returns a user-friendly error message with details if available

Return type:

str

exception niamoto.common.exceptions.EnvironmentSetupError(message, details=None)

Bases: NiamotoError

Exception raised for environment setup and configuration issues.

Parameters:
  • message (str)

  • details (dict | None)

exception niamoto.common.exceptions.LoggingError(message, details=None)

Bases: NiamotoError

Exception raised for logging configuration and handling errors.

Parameters:
  • message (str)

  • details (dict | None)

exception niamoto.common.exceptions.CLIError(message, details=None)

Bases: NiamotoError

Base class for CLI related errors.

Parameters:
  • message (str)

  • details (dict | None)

exception niamoto.common.exceptions.CommandError(command, message, details=None)

Bases: CLIError

Raised when a command fails to execute.

Parameters:
  • command (str)

  • message (str)

  • details (dict | None)

exception niamoto.common.exceptions.ArgumentError(argument, message, details=None)

Bases: CLIError

Raised when command arguments are invalid.

Parameters:
  • argument (str)

  • message (str)

  • details (dict | None)

exception niamoto.common.exceptions.VersionError(message, details=None)

Bases: CLIError

Raised when version information cannot be retrieved.

Parameters:
  • message (str)

  • details (dict | None)

exception niamoto.common.exceptions.InputError(message, details=None)

Bases: NiamotoError

Exception raised for errors in input data.

Parameters:
  • message (str)

  • details (dict | None)

exception niamoto.common.exceptions.ValidationError(field, message, details=None)

Bases: NiamotoError

Exception raised for validation errors.

Parameters:
  • field (str)

  • message (str)

  • details (dict | None)

exception niamoto.common.exceptions.DataValidationError(message, validation_errors)

Bases: NiamotoError

Exception raised for data validation errors.

Parameters:
  • message (str)

  • validation_errors (list[dict[str, Any]])

exception niamoto.common.exceptions.DataLoadError(message, details=None)

Bases: NiamotoError

Raised when there is an error loading data.

Parameters:
  • message (str)

  • details (dict | None)

exception niamoto.common.exceptions.FileError(file_path, message, details=None)

Bases: NiamotoError

Base class for file related errors.

Parameters:
  • file_path (str)

  • message (str)

  • details (dict | None)

exception niamoto.common.exceptions.FileReadError(file_path, message, details=None)

Bases: FileError

Exception raised when there is an error reading a file.

Parameters:
  • file_path (str)

  • message (str)

  • details (dict | None)

exception niamoto.common.exceptions.FileWriteError(file_path, message, details=None)

Bases: FileError

Exception raised when there is an error writing a file.

Parameters:
  • file_path (str)

  • message (str)

  • details (dict | None)

exception niamoto.common.exceptions.FileFormatError(file_path, message, details=None)

Bases: FileError

Exception raised when file format is invalid.

Parameters:
  • file_path (str)

  • message (str)

  • details (dict | None)

exception niamoto.common.exceptions.CSVError(file_path, message, details=None)

Bases: FileError

Exception raised for CSV-specific errors.

Parameters:
  • file_path (str)

  • message (str)

  • details (dict | None)

exception niamoto.common.exceptions.DatabaseError(message, details=None)

Bases: NiamotoError

Base class for database related errors.

Parameters:
  • message (str)

  • details (dict | None)

exception niamoto.common.exceptions.DatabaseConnectionError(message, details=None)

Bases: DatabaseError

Exception raised when database connection fails.

Parameters:
  • message (str)

  • details (dict | None)

exception niamoto.common.exceptions.DatabaseWriteError(table_name, message, details=None)

Bases: DatabaseError

Exception raised when writing to database fails.

Parameters:
  • table_name (str)

  • message (str)

  • details (dict | None)

exception niamoto.common.exceptions.DatabaseQueryError(query, message, details=None)

Bases: DatabaseError

Exception raised when a database query fails.

Parameters:
  • query (str)

  • message (str)

  • details (dict | None)

get_user_message()

Returns a detailed user-friendly error message

Return type:

str

exception niamoto.common.exceptions.TransactionError(message, details=None)

Bases: DatabaseError

Exception raised for transaction-related errors.

Parameters:
  • message (str)

  • details (dict | None)

exception niamoto.common.exceptions.DatabaseLockError(message, query=None, details=None)

Bases: DatabaseQueryError

Exception raised when the database is locked by another process.

Parameters:
  • message (str)

  • query (str | None)

  • details (dict | None)

Return type:

None

get_user_message()

Return friendlier message without query context when absent.

Return type:

str

exception niamoto.common.exceptions.DataImportError(message, details=None)

Bases: NiamotoError

Base class for import related errors.

Parameters:
  • message (str)

  • details (dict | None)

exception niamoto.common.exceptions.TaxonomyImportError(message, details=None)

Bases: DataImportError

Exception raised for taxonomy import errors.

Parameters:
  • message (str)

  • details (dict | None)

exception niamoto.common.exceptions.OccurrenceImportError(message, details=None)

Bases: DataImportError

Exception raised for occurrence import errors.

Parameters:
  • message (str)

  • details (dict | None)

exception niamoto.common.exceptions.PlotImportError(message, details=None)

Bases: DataImportError

Exception raised for plot import errors.

Parameters:
  • message (str)

  • details (dict | None)

exception niamoto.common.exceptions.ShapeImportError(message, details=None)

Bases: DataImportError

Exception raised for shape import errors.

Parameters:
  • message (str)

  • details (dict | None)

exception niamoto.common.exceptions.ProcessError(message, details=None)

Bases: NiamotoError

Base class for transforms calculation errors.

Parameters:
  • message (str)

  • details (dict | None)

get_user_message()

Returns a user-friendly error message with details if available

Return type:

str

exception niamoto.common.exceptions.CalculationError(message, details=None)

Bases: ProcessError

Exception raised for mathematical calculation errors.

Parameters:
  • message (str)

  • details (dict | None)

exception niamoto.common.exceptions.DataTransformError(message, details=None)

Bases: ProcessError

Exception raised for errors during data processing for transforms.

Parameters:
  • message (str)

  • details (dict | None)

exception niamoto.common.exceptions.JSONEncodeError(message, details=None)

Bases: DataTransformError

Exception raised for JSON encoding errors during data transformation.

Parameters:
  • message (str)

  • details (dict | None)

exception niamoto.common.exceptions.GenerationError(message, details=None)

Bases: NiamotoError

Base class for content generation errors.

Parameters:
  • message (str)

  • details (dict | None)

exception niamoto.common.exceptions.TemplateError(template_name, message, details=None)

Bases: GenerationError

Exception raised for template processing errors.

Parameters:
  • template_name (str)

  • message (str)

  • details (dict | None)

exception niamoto.common.exceptions.OutputError(output_path, message, details=None)

Bases: GenerationError

Exception raised for file generation and output errors.

Parameters:
  • output_path (str)

  • message (str)

  • details (dict | None)

niamoto.common.paths module

niamoto.common.progress module

Progress tracking utilities that can work with or without Rich.

class niamoto.common.progress.ProgressTracker(use_progress_bar=True)

Bases: object

A progress tracker that can work with or without Rich Progress bars.

Parameters:

use_progress_bar (bool)

track(description, total=None)

Context manager for tracking progress.

Parameters:
  • description (str) – Description of the task

  • total (int | None) – Total number of items to process

Yields:

A progress updater function

status(message)

Context manager for showing a status message.

Parameters:

message (str) – Status message to display

niamoto.common.progress.get_progress_tracker()

Get the global progress tracker instance.

Return type:

ProgressTracker

niamoto.common.progress.set_progress_mode(use_progress_bar)

Set whether to use progress bars globally.

Parameters:

use_progress_bar (bool) – True to use Rich progress bars, False to use logging

Module contents