resoterre package

Machine learning tools and extensions for climate science.

Submodules

resoterre.cli module

Console script for resoterre.

resoterre.config_utils module

Utilities for handling configuration classes.

resoterre.config_utils.assign_custom_class_to_config_dict(config_dict: dict[Any, Any], known_custom_config_dict: dict[str, Any] | None = None, type_key: str = 'type') None[source]

Recursively assign custom classes to a configuration dictionary.

Parameters

config_dictdict

The configuration dictionary to process.

known_custom_config_dictdict, optional

A dictionary mapping names to configuration classes. Defaults to the global known_configs.

type_keystr

The key in the configuration dictionary that indicates the class name to assign.

Returns

None

The function modifies the config_dict in place.

resoterre.config_utils.config_from_yaml(config_cls: Any, yaml_obj: dict[str, Any] | Path | str, known_custom_config_dict: dict[str, Any] | None = None, type_key: str = 'type') Any[source]

Create a configuration object from a YAML object, handling custom classes.

Parameters

config_clstype

The configuration class to instantiate.

yaml_objdict | Path | str

The YAML object to convert.

known_custom_config_dictdict, optional

A dictionary mapping names to configuration classes. Defaults to the global known_configs.

type_keystr

The key in the configuration dictionary that indicates the class name to assign.

Returns

config_cls

An instance of the configuration class populated from the YAML object.

resoterre.config_utils.register_config(name: str, known_configs_dict: dict[str, Any] | None = None, overwrite: bool = False) Any[source]

Decorator to register a configuration class with a given name.

Parameters

namestr

The name to register the configuration class under.

known_configs_dictdict, optional

The dictionary to register the configuration class in. Defaults to the global known_configs.

overwritebool

Whether overwriting an existing configuration with the same name is allowed.

Returns

decorator

A decorator that registers the configuration class.

resoterre.io_utils module

Utility functions for input/output operations.

resoterre.io_utils.get_yaml_dict(yaml_obj: dict[str, Any] | Path | str) dict[str, Any][source]

Get a dictionary from a YAML object or file.

Parameters

yaml_objdict | Path | str

A dictionary or a path to a YAML file.

Returns

dict[str, Any]

The dictionary obtained from the YAML file, or the input dictionary itself.

resoterre.io_utils.override_config_paths(config: dict[str, Any] | Path | str, overrides: dict[str, Path | str | None]) dict[str, Any][source]

Override multiple paths in a configuration dictionary.

Parameters

configdict[str, Any] | Path | str

A dictionary or a path to a YAML file.

overridesdict[str, Path | str | None]

Dictionary of key-value pairs to override. If a value is None, no override is performed for that key.

Returns

dict[str, Any]

The config dictionary with the overridden paths.

resoterre.io_utils.purge_files(path: Path | str, pattern: str, older_than: float | None = None, more_than: int | None = None, must_both_be_true: bool = False, recursive: bool = False, safe: bool = True, excludes: list[str] | None = None) list[str][source]

Purge files in a directory based on age and/or quantity criteria.

Parameters

pathPath | str

The directory path where files are located.

patternstr

The glob pattern to match files.

older_thanfloat, optional

Age in seconds; files older than this will be purged.

more_thanint, optional

If the number of matching files exceeds this number, the oldest files will be purged.

must_both_be_truebool

If True, files must meet both criteria to be purged; if False, meeting either criterion is sufficient.

recursivebool

If True, search for files recursively in subdirectories.

safebool

If True, enables safe mode which restricts certain operations.

excludeslist[str], optional

List of file paths to exclude from purging.

Returns

list[str]

List of file paths that were purged.

resoterre.logging_utils module

Utilities for logging.

class resoterre.logging_utils.CustomLogging(caller: Any = None, queue: Queue[tuple[str, str]] | None = None, quick_repetition_tolerance: int = 1)[source]

Bases: object

Custom logging class to handle logging with repetition overload protection.

Parameters

callerAny, optional

The logger to use. Defaults to the root logger.

queuequeue.Queue[tuple[str, str]], optional

A queue to store log messages.

quick_repetition_toleranceint

Number of quick repetitions allowed before blocking further messages.

critical(message: str, *args: Any, block_short_repetition_delay: int = 0, identifier: str = '', stacklevel: int = 0, expected_nb_of_calls: int = 1, add_eta: bool = False) bool[source]

Log a critical message.

Parameters

messagestr

The message to log.

*argsAny

Additional arguments for the logging method.

block_short_repetition_delayint

Time in seconds to block repeated messages.

identifierstr

Unique identifier for the message.

stacklevelint

Stack level for logging.

expected_nb_of_callsint

Expected number of calls for ETA calculation.

add_etabool

Whether to add ETA information to the message.

Returns

bool

True if the message was logged, False if it was blocked.

debug(message: str, *args: Any, block_short_repetition_delay: int = 0, identifier: str = '', stacklevel: int = 0, expected_nb_of_calls: int = 1, add_eta: bool = False) bool[source]

Log a debug message.

Parameters

messagestr

The message to log.

*argsAny

Additional arguments for the logging method.

block_short_repetition_delayint

Time in seconds to block repeated messages.

identifierstr

Unique identifier for the message.

stacklevelint

Stack level for logging.

expected_nb_of_callsint

Expected number of calls for ETA calculation.

add_etabool

Whether to add ETA information to the message.

Returns

bool

True if the message was logged, False if it was blocked.

error(message: str, *args: Any, block_short_repetition_delay: int = 0, identifier: str = '', stacklevel: int = 0, expected_nb_of_calls: int = 1, add_eta: bool = False) bool[source]

Log an error message.

Parameters

messagestr

The message to log.

*argsAny

Additional arguments for the logging method.

block_short_repetition_delayint

Time in seconds to block repeated messages.

identifierstr

Unique identifier for the message.

stacklevelint

Stack level for logging.

expected_nb_of_callsint

Expected number of calls for ETA calculation.

add_etabool

Whether to add ETA information to the message.

Returns

bool

True if the message was logged, False if it was blocked.

info(message: str, *args: Any, block_short_repetition_delay: int = 0, identifier: str = '', stacklevel: int = 0, expected_nb_of_calls: int = 1, add_eta: bool = False) bool[source]

Log an info message.

Parameters

messagestr

The message to log.

*argsAny

Additional arguments for the logging method.

block_short_repetition_delayint

Time in seconds to block repeated messages.

identifierstr

Unique identifier for the message.

stacklevelint

Stack level for logging.

expected_nb_of_callsint

Expected number of calls for ETA calculation.

add_etabool

Whether to add ETA information to the message.

Returns

bool

True if the message was logged, False if it was blocked.

log(caller: Callable[[...], None], message: str, *args: Any, block_short_repetition_delay: int = 0, identifier: str = '', stacklevel: int = 0, expected_nb_of_calls: int = 1, add_eta: bool = False) bool[source]

Log a message with repetition overload protection.

Parameters

callerCallable[…, None]

The logging method to use (e.g., logger.info).

messagestr

The message to log.

*argsAny

Additional arguments for the logging method.

block_short_repetition_delayint

Time in seconds to block repeated messages.

identifierstr

Unique identifier for the message.

stacklevelint

Stack level for logging.

expected_nb_of_callsint

Expected number of calls for ETA calculation.

add_etabool

Whether to add ETA information to the message.

Returns

bool

True if the message was logged, False if it was blocked.

warning(message: str, *args: Any, block_short_repetition_delay: int = 0, identifier: str = '', stacklevel: int = 0, expected_nb_of_calls: int = 1, add_eta: bool = False) bool[source]

Log a warning message.

Parameters

messagestr

The message to log.

*argsAny

Additional arguments for the logging method.

block_short_repetition_delayint

Time in seconds to block repeated messages.

identifierstr

Unique identifier for the message.

stacklevelint

Stack level for logging.

expected_nb_of_callsint

Expected number of calls for ETA calculation.

add_etabool

Whether to add ETA information to the message.

Returns

bool

True if the message was logged, False if it was blocked.

resoterre.logging_utils.default_basic_config_args(basic_config_args: dict[str, Any], show_logger_name: bool = False, show_date: bool = True) dict[str, Any][source]

Create a default set of basicConfig arguments for logging.

Parameters

basic_config_argsdict[str, Any]

Basic configuration arguments to customize.

show_logger_namebool

Whether to include the logger name in the log format.

show_datebool

Whether to include the date in the log format.

Returns

dict[str, Any]

A dictionary of basicConfig arguments for logging.

resoterre.logging_utils.readable_delta_t(delta_t: int | float) str[source]

Return a human-readable string of a time delta.

Parameters

delta_tint | float

Time delta in seconds.

Returns

str

A convenient human-readable string of the time delta.

resoterre.logging_utils.readable_value(value: int | float, expected_min: int | float = -1e+38, expected_max: int | float = 1e+38) str[source]

Return a human-readable string of a value.

Parameters

valueint | float

The value.

expected_minint | float

The expected minimum value.

expected_maxint | float

The expected maximum value.

Returns

str

A convenient human-readable string of the value.

resoterre.logging_utils.start_root_logger(basic_config_args: dict[str, Any] | None = None, show_logger_name: bool = True, show_date: bool = True, show_loggers_on_init: bool = False, disable_loggers: list[str] | None = None, templates: TemplateStore | None = None) str[source]

Start a global root logger with specified configuration.

Parameters

basic_config_argsdict[str, object], optional

Basic configuration arguments for logging.

show_logger_namebool

Whether to include the logger name in the log format.

show_datebool

Whether to include the date in the log format.

show_loggers_on_initbool

Whether to log the existing loggers on initialization.

disable_loggerslist[str], optional

List of logger name prefixes to disable.

templatesTemplateStore, optional

TemplateStore containing template paths, including ‘log_file’.

Returns

str

The path to the log file used by the root logger.

resoterre.memory_utils module

Module for memory usage utilities.

resoterre.memory_utils.check_over_memory(memory_threshold_in_gb: int | float, user_name: str | None = None) bool[source]

Check if the memory usage of a specific user exceeds a given threshold in gigabytes.

Parameters

memory_threshold_in_gbint | float

Memory threshold in gigabytes.

user_namestr, optional

The username to check memory usage for. If None, uses the current user.

Returns

bool

True if memory usage exceeds the threshold, False otherwise.

resoterre.memory_utils.get_memory_usage_by_user(user_name: str | None = None) int[source]

Get the total memory usage of all processes owned by a specific user.

Parameters

user_namestr, optional

The username to check memory usage for. If None, uses the current user.

Returns

int

Total memory usage in bytes for the specified user.

resoterre.memory_utils.readable_memory_usage(memory_usage: int) str[source]

Convert memory usage in bytes to a human-readable string.

Parameters

memory_usageint

Memory usage in bytes.

Returns

str

Human-readable memory usage string.

resoterre.resoterre module

Main module.

resoterre.snakemake_utils module

Utility functions for Snakemake workflows.

resoterre.snakemake_utils.decode_period_string(period_string: str) tuple[datetime, datetime][source]

Decode a period string into start and end datetime objects.

Parameters

period_stringstr

Period string.

Returns

tuple[datetime, datetime]

Start and end datetime objects.

resoterre.snakemake_utils.merge_logs(inputs: Path | str | list[Path | str], output: Path | str, search_patterns: list[str] | None = None, purge: bool = False, from_json_manifest: bool = False) None[source]

Merge multiple log files into a single log file, optionally filtering by search patterns.

Parameters

inputsPath | str | list[Path | str]

Input log file path or list of log file paths. If a single path is given, all .log (or .json if from_json_manifest is True) in that directory are merged.

outputPath | str

Output log file path.

search_patternslist[str], optional

List of strings to search for in log lines. Only lines containing at least one of these strings are included. If None, all lines are included. Default is None.

purgebool

If True, delete input log files that do not contribute any lines to the output log file. Default is False.

from_json_manifestbool

If True, treat ‘inputs’ as a JSON manifest file containing log file paths (log_file key at top level dict).

resoterre.snakemake_utils.merge_manifests(inputs: list[Path | str], output: Path | str) None[source]

Merge multiple manifest files into a single manifest file, removing duplicates and empty lines.

Parameters

inputslist[Path | str]

List of input manifest file paths.

outputPath | str

Output manifest file path.

resoterre.snakemake_utils.split_glob(search_path: str | Path, glob_pattern: str, batch_size: int = 1, output_directory: str | Path | None = None, manifest_prefix: str | None = None) list[list[Path]][source]

Split files matching a glob pattern into batches.

Parameters

search_pathstr | Path

Path to search for files.

glob_patternstr

Glob pattern to match files.

batch_sizeint

Number of files in each batch.

output_directorystr | Path

Directory to save manifest files. If None, no manifest files are saved.

manifest_prefixstr

Prefix for manifest file names. If None, no manifest files are saved.

Returns

list[list[Path]]

List of batches, each batch is a list of Path objects.

resoterre.snakemake_utils.split_period(start_datetime: datetime, end_datetime: datetime, batch_size: int, datetime_format: str, days: int = 0, seconds: int = 0, microseconds: int = 0, milliseconds: int = 0, minutes: int = 0, hours: int = 0, weeks: int = 0) list[str][source]

Split a period into smaller periods based on batch size and time delta.

Parameters

start_datetimedatetime

Start datetime of the period.

end_datetimedatetime

End datetime of the period.

batch_sizeint

Number of time steps in each smaller period.

datetime_formatstr

Format string for datetime objects.

daysint

Number of days in the time delta.

secondsint

Number of seconds in the time delta.

microsecondsint

Number of microseconds in the time delta.

millisecondsint

Number of milliseconds in the time delta.

minutesint

Number of minutes in the time delta.

hoursint

Number of hours in the time delta.

weeksint

Number of weeks in the time delta.

Returns

list[str]

List of period strings.

Notes

Current implementation can overshoot the end_datetime by a single time step.

resoterre.utils module

General utilities.

class resoterre.utils.TemplateStore(templates: dict[str, str | Template] | None = None, substitutes: dict[str, str] | None = None, substitute_timestamp: bool = True, substitute_pid: bool = True)[source]

Bases: object

Collection of string templates with substitution capabilities.

Parameters

templatesdict[str, str | Template]

Dictionary of templates.

substitutesdict[str, str]

Dictionary of substitution values.

substitute_timestampbool

Whether to automatically substitute the current timestamp.

substitute_pidbool

Whether to automatically substitute the current process ID.

add(key: str, value: str | Template) None[source]

Add a new template.

Parameters

keystr

Name of the template.

valuestr | Template

The template string or Template object.

add_substitutes(**kwargs: str) None[source]

Add new substitution values.

Parameters

**kwargsdict[str, str]

Key-value pairs for substitution.

complete(template_name: str) str[source]

Complete the template by performing substitutions.

Parameters

template_namestr

Name of the template to complete.

Returns

str

The completed template string.

substitutes: dict[str, str]
templates: dict[str, Template]
resoterre.utils.unique_hex_digest(unique_elements: Any, length: int = 8) str[source]

Generate a unique hexadecimal digest based on the input elements.

Parameters

unique_elementsAny

The elements to generate a unique digest for. They must have a unique string representation.

lengthint

The length of the hexadecimal digest truncation to return.

Returns

str

A hexadecimal digest string of the specified length.