"""Utility functions for Snakemake workflows."""
import json
from datetime import datetime, timedelta
from pathlib import Path
[docs]
def merge_manifests(inputs: list[Path | str], output: Path | str) -> None:
"""
Merge multiple manifest files into a single manifest file, removing duplicates and empty lines.
Parameters
----------
inputs : list[Path | str]
List of input manifest file paths.
output : Path | str
Output manifest file path.
"""
Path(output).parent.mkdir(parents=True, exist_ok=True)
seen = set()
with Path(output).open("w") as out:
for infile in inputs:
with Path(infile).open("r") as f:
for line in f:
line = line.rstrip("\r\n")
if not line or line in seen:
continue
seen.add(line)
out.write(f"{line}\n")
[docs]
def merge_logs(
inputs: Path | str | list[Path | str],
output: Path | str,
search_patterns: list[str] | None = None,
purge: bool = False,
from_json_manifest: bool = False,
) -> None:
"""
Merge multiple log files into a single log file, optionally filtering by search patterns.
Parameters
----------
inputs : Path | str | list[Path | str]
Input log file path or list of log file paths.
If a single path is given, all .log (or .json if from_json_manifest is True) in that directory are merged.
output : Path | str
Output log file path.
search_patterns : list[str], optional
List of strings to search for in log lines. Only lines containing at least one of these strings are included.
If None, all lines are included. Default is None.
purge : bool
If True, delete input log files that do not contribute any lines to the output log file.
Default is False.
from_json_manifest : bool
If True, treat 'inputs' as a JSON manifest file containing log file paths (log_file key at top level dict).
"""
inputs_extension = "*.json" if from_json_manifest else "*.log"
if not isinstance(inputs, list):
inputs = sorted(list(Path(inputs).glob(inputs_extension)))
Path(output).parent.mkdir(parents=True, exist_ok=True)
leading_str = ""
with Path(output).open("w") as out:
for infile in inputs:
wrote_a_line = False
if from_json_manifest:
with Path(infile).open("r") as f:
manifest_content = json.load(f)
log_infile = manifest_content["log_file"]
else:
log_infile = infile
with Path(log_infile).open("r") as f:
for line in f:
if search_patterns is not None:
for search_pattern in search_patterns:
if search_pattern in line:
break
else:
continue
if not wrote_a_line:
out.write(f"{leading_str}--- From file: {log_infile} ---\n\n")
wrote_a_line = True
leading_str = "\n"
out.write(line)
if purge and (not wrote_a_line):
Path(log_infile).unlink()
[docs]
def decode_period_string(period_string: str) -> tuple[datetime, datetime]:
"""
Decode a period string into start and end datetime objects.
Parameters
----------
period_string : str
Period string.
Returns
-------
tuple[datetime, datetime]
Start and end datetime objects.
"""
start_datetime_string, end_datetime_string = period_string.split("_")
if len(start_datetime_string) == 10:
start_datetime = datetime.strptime(start_datetime_string, "%Y%m%d%H")
else:
raise NotImplementedError()
if len(end_datetime_string) == 10:
end_datetime = datetime.strptime(end_datetime_string, "%Y%m%d%H")
else:
raise NotImplementedError()
return start_datetime, end_datetime
[docs]
def split_period(
start_datetime: datetime,
end_datetime: datetime,
batch_size: int,
datetime_format: str,
days: int = 0,
seconds: int = 0,
microseconds: int = 0,
milliseconds: int = 0,
minutes: int = 0,
hours: int = 0,
weeks: int = 0,
) -> list[str]:
"""
Split a period into smaller periods based on batch size and time delta.
Parameters
----------
start_datetime : datetime
Start datetime of the period.
end_datetime : datetime
End datetime of the period.
batch_size : int
Number of time steps in each smaller period.
datetime_format : str
Format string for datetime objects.
days : int
Number of days in the time delta.
seconds : int
Number of seconds in the time delta.
microseconds : int
Number of microseconds in the time delta.
milliseconds : int
Number of milliseconds in the time delta.
minutes : int
Number of minutes in the time delta.
hours : int
Number of hours in the time delta.
weeks : int
Number of weeks in the time delta.
Returns
-------
list[str]
List of period strings.
Notes
-----
Current implementation can overshoot the end_datetime by a single time step.
"""
period_strings = []
current_datetime = start_datetime
while current_datetime <= end_datetime:
period_start_datetime = current_datetime
for _ in range(batch_size - 1):
current_datetime += timedelta(
days=days,
seconds=seconds,
microseconds=microseconds,
milliseconds=milliseconds,
minutes=minutes,
hours=hours,
weeks=weeks,
)
if current_datetime == end_datetime:
break
period_start_string = period_start_datetime.strftime(datetime_format)
period_end_string = current_datetime.strftime(datetime_format)
period_strings.append(f"{period_start_string}_{period_end_string}")
current_datetime += timedelta(
days=days,
seconds=seconds,
microseconds=microseconds,
milliseconds=milliseconds,
minutes=minutes,
hours=hours,
weeks=weeks,
)
return period_strings
[docs]
def split_glob(
search_path: str | Path,
glob_pattern: str,
batch_size: int = 1,
output_directory: str | Path | None = None,
manifest_prefix: str | None = None,
) -> list[list[Path]]:
"""
Split files matching a glob pattern into batches.
Parameters
----------
search_path : str | Path
Path to search for files.
glob_pattern : str
Glob pattern to match files.
batch_size : int
Number of files in each batch.
output_directory : str | Path
Directory to save manifest files. If None, no manifest files are saved.
manifest_prefix : str
Prefix for manifest file names. If None, no manifest files are saved.
Returns
-------
list[list[Path]]
List of batches, each batch is a list of Path objects.
"""
all_files = sorted(Path(search_path).glob(glob_pattern))
batches = [all_files[i : i + batch_size] for i in range(0, len(all_files), batch_size)]
if (output_directory is not None) and (manifest_prefix is not None):
output_directory = Path(output_directory)
output_directory.mkdir(parents=True, exist_ok=True)
for i, batch in enumerate(batches):
manifest_path = Path(output_directory, f"{manifest_prefix}_{str(i).zfill(8)}.txt")
manifest_path.write_text("\n".join(str(p) for p in batch))
return batches