Input/Output
I/O from and to S3
Reusable functions to read/write data from/to our S3 drive.
- impresso_essentials.io.s3.alternative_read_text(s3_key: str, s3_credentials: dict, line_by_line: bool = True) list[str] | str
Read from S3 a line-separated text file (e.g. *.jsonl.bz2).
- Note:
The reason for this function is a bug in dask.bag.read_text() which breaks on buckets having >= 1000 keys. It raises a FileNotFoundError.
- Parameters:
s3_key (str) – Full S3 path to the file to read.
s3_credentials (dict) – S3 credentials, IMPRESSO_STORAGEOPT.
line_by_line (bool, optional) – Whether to read the file line by line. Defaults to True.
- Returns:
Contents of the file, as a list of strings or as one string.
- Return type:
list[str] | str
- impresso_essentials.io.s3.extract_provider_alias_key(s3_key: str, bucket: str, prov_included: bool = True) tuple[str, str]
Extract the media alias an s3:key corresponds to given the bucket and partition
eg. s3_key is in format: - s3_key: ‘s3://31-passim-rebuilt-staging/passim/[provider]/[alias]/[alias]-[year].jsonl.bz2’ - bucket: ‘31-passim-rebuilt-staging/passim’ - prov_included: True –> returns (provider, alias)
- Parameters:
s3_key (str) – Full S3 path of a file (as returned by fixed_s3fs_glob).
bucket (str) – S3 bucket, including partition, in which the media dirs are.
prov_included (bool, optional) – Whether or not the provider level is present in the structure of the provided bucket. Defaults to True.
- Returns:
Media alias of the corresponding media, and corresponding provider.
- Return type:
tuple[str, str]
- impresso_essentials.io.s3.fetch_files(bucket_name: str, compute: bool = True, file_type: str = 'issues', providers_filter: list[str] | None = None, aliases_filter: list[str] | None = None) tuple[Bag | None, Bag | None] | tuple[list[str] | None, list[str] | None]
Fetch issue and/or page canonical JSON files from an s3 bucket.
If compute=True, the output will be a list of the contents of all files in the bucket for the specified newspapers and type of files. If compute=False, the output will remain in a distributed dask.bag.
For the file type, the possible values are the following: - ‘issues’, ‘pages’, ‘audios’: include only bz2 files of the given type. - ‘supports’: include all pages and audios bz2 files, returned in element [1] of the tuple. - ‘both’: include all types of files, with issues ([0]) and supports -pages and audios- ([1]).
Based on file_type, the issue files, page/audio (“support”) files or both will be returned. In the returned tuple, issues are always in the first element and supports in the second, hence if file_type is not ‘both’, the tuple entry corresponding to the undesired type of files will be None.
- Parameters:
bucket_name (str) – Name of the s3 bucket to fetch the files form
compute (bool, optional) – Whether to compute result and output as list. Defaults to True.
file_type – (str, optional): Type of files to list, possible values are “issues”, “pages”, “audios”, “supports” and “both”. Defaults to “issues”.
providers_filter (list[str] | None, optional) – List of providers for which to consider the aliases. If None, aliases_filter will be considered. Defaults to None.
aliases_filter (list[str] | None, optional) – List of aliases to consider. If None, all will be considered. Defaults to None.
- Raises:
NotImplementedError – The given file_type is not one of [‘issues’, ‘pages’, ‘audios’, ‘support’, ‘both’].
- Returns:
[0] Issue files’ contents or None and [1] Page and Audio Record files’ contents or None based on file_type
- Return type:
tuple[db.core.Bag|None, db.core.Bag|None] | tuple[list[str]|None, list[str]|None]
- impresso_essentials.io.s3.fixed_s3fs_glob(path: str, suffix: str | None = None, boto3_bucket=None) list[str]
Custom glob function able to list more than 1000 elements on s3 (fix of s3fs).
Note
path should be of the form “[partition]*[suffix or file extensions]”, with the partition potentially including the bucket name. If all files within the partitions should be considered, regardeless of their extension, “*” can be omitted. Conversely, path can be of the form “[partition]” if suffix is defined.
- Parameters:
path (str) – Glob path to the files, optionally including the bucket name. If the bucket name is not included, boto3_bucket should be defined.
suffix (str | None, optional) – Suffix or extension of the paths to consider within the bucket. Only used if “*” not found in path. Defaults to None.
boto3_bucket (boto3.resources.factory.s3.Bucket, optional) – S3 bucket to look into. Defaults to None.
- Returns:
List of filenames within the bucket corresponding to the provided path.
- Return type:
list[str]
- impresso_essentials.io.s3.get_bucket(bucket_name: str)
Create a boto3 connection and return the desired bucket.
Note
This function does not ensure that the bucket exists. If this verification is necessary, please prefer using get_or_create_bucket() instead.
- Parameters:
bucket_name (str) – Name of the S3 bucket to use.
- Returns:
Desired S3 bucket.
- Return type:
boto3.resources.factory.s3.Bucket
- impresso_essentials.io.s3.get_or_create_bucket(name: str, create: bool = False)
Create a boto3 s3 connection and create or return the requested bucket.
It is possible to ask for creating a new bucket with the specified name (in case it does not exist): >>> b = get_bucket(‘testb’, create=False) >>> b = get_bucket(‘testb’, create=True)
- Parameters:
name (str) – Name of thebucket to get of create.
create (bool, optional) – Whether to create the bucket if it doesn’t exist. Defaults to False.
- Returns:
S3 bucket, fetched or created.
- Return type:
boto3.resources.factory.s3.Bucket
- impresso_essentials.io.s3.get_s3_client(host_url: str | None = 'https://os.zhdk.cloud.switch.ch/') BaseClient
Create S3 boto3 client using environment variables from local .env files.
Assumes that two environment variables are set: SE_ACCESS_KEY and SE_SECRET_KEY.
- Parameters:
host_url (str | None, optional) – _description_. Defaults to “https://os.zhdk.cloud.switch.ch/”.
- Raises:
e – Argument host_url was not provided and SE_HOST_URL was not in the env.
e – SE_ACCESS_KEY or SE_SECRET_KEY was not in the environment variables.
- Returns:
The S3 boto3 client.
- Return type:
BaseClient
- impresso_essentials.io.s3.get_s3_object_size(bucket_name: str, key: str) int
Get the size of an object (key) in an S3 bucket.
- Parameters:
bucket_name (str) – The name of the S3 bucket.
key (str) – The key (object) whose size you want to retrieve.
- Returns:
The size of the object in bytes, or None if the object doesn’t exist.
- Return type:
int
- impresso_essentials.io.s3.get_s3_resource(host_url: str | None = 'https://os.zhdk.cloud.switch.ch/') ServiceResource
Get a boto3 resource object related to an S3 drive.
Assumes that two environment variables are set: SE_ACCESS_KEY and SE_SECRET_KEY.
- Parameters:
host_url (str | None, optional) – _description_. Defaults to “https://os.zhdk.cloud.switch.ch/”.
- Raises:
e – Argument host_url was not provided and SE_HOST_URL was not in the env.
e – SE_ACCESS_KEY or SE_SECRET_KEY was not in the environment variables.
- Returns:
S3 resource associated to the endpoint.
- Return type:
ServiceResource
- impresso_essentials.io.s3.get_storage_options() dict[str, dict | str]
Load environment variables from local .env files
Assumes that two environment variables are set: SE_ACCESS_KEY and SE_SECRET_KEY.
- Returns:
Credentials to access a S3 endpoint.
- Return type:
dict[str, dict | str]
- impresso_essentials.io.s3.list_canonical_files(bucket_name: str, file_type: str = 'issues', providers_filter: list[str] | None = None, aliases_filter: list[str] | None = None) tuple[list[str] | None, list[str] | None]
List the canonical files located in a given S3 bucket.
Note
The filters are applied in a hierchical manner; first at provider level, then at alias level if no provider filter was given.
Note
For the file type, the possible values are the following: - “issues”, “pages”, “audios”: include only bz2 files of the given type. - “supports”: include all pages and audios bz2 files, returned [1] element of the tuple. - “both”: include all types of files, with issues ([0]) and supports ([1]).
- Parameters:
bucket_name (str) – S3 bucket name.
file_type (str, optional) – Type of files to list, possible values are “issues”, “pages”, “audios”, “supports” and “both”. Defaults to “issues”.
providers_filter (list[str] | None, optional) – List of providers for which to consider the aliases. If None, aliases_filter will be considered. Defaults to None.
aliases_filter (list[str] | None, optional) – List of aliases to consider. If None, all will be considered. Defaults to None.
- Raises:
NotImplementedError – The given file_type is not one of [‘issues’, ‘pages’, ‘audios’, ‘supports’, ‘both’].
- Returns:
- [0] List of issue files or None and
[1] List of page/audio files or None based on file_type
- Return type:
tuple[list[str] | None, list[str] | None]
- impresso_essentials.io.s3.list_media_titles(bucket_name: str, s3_client=<botocore.client.S3 object>, page_size: int = 10000, prov_included: bool = True) list[str]
List media titles contained in an s3 bucket with impresso data.
By default, it is considered that the bucket contains the provided level.
Note
25,000 seems to be the maximum PageSize value supported by SwitchEngines’ S3 implementation (ceph).
- Parameters:
bucket_name (str) – Name of the S3 bucket to consider
s3_client (optional) – S3 client to use. Defaults to get_s3_client().
page_size (int, optional) – Pagination configuration. Defaults to 10000.
prov_included (bool, optional) – Whether the provider level is included in the bucket structure. Defaults to True
- Returns:
Sorted list of media titles (aliases) present in the given S3 bucket.
- Return type:
list[str]
- impresso_essentials.io.s3.list_providers_and_aliases(bucket_name: str, prefix: str = '') list[str]
Lists providers and their alias directories from an S3 bucket.
Traverses the given S3 bucket to identify provider directories and their associated alias subdirectories, under an optional prefix. Returns a dictionary mapping each provider to a sorted list of aliases.
- Parameters:
bucket_name (str) – The name of the S3 bucket to query.
prefix (str, optional) – The prefix path within the bucket to search (should have a tailing /). Defaults to the root (‘’).
- Returns:
Dict mapping sorted provider names to lists of alias directory names.
- Return type:
dict[str, list[str]]
- Raises:
botocore.exceptions.ClientError – If there is an issue communicating with S3.
Example
>>> list_providers_and_aliases('141-processed-data-staging', 'embeddings/images/embeddings_dinov2_v1-0-0/') { 'BCUL': ['ACI', 'AV', 'Bombe', 'CL', ..., 'esta', 'ouistiti'], ... 'SNL': ['BDC', 'CDV', ..., 'WHD', 'ZBT'] }
- impresso_essentials.io.s3.list_s3_directories(bucket_name: str, prefix: str = '') list[str]
Retrieve ‘directory’ names in an S3 bucket given a path prefix.
Depending on the prefix and the specific bucket, this will either list the media providers or the media aliases. For s3 partitions which have the partner layer, the prefix parameter allows to select a specific partner and list its associated media aliases.
- Parameters:
bucket_name (str) – The name of the S3 bucket.
prefix (str) – The prefix path within the bucket to search (should include a tailing /). Defaults to the root (‘’).
- Returns:
A list of ‘directory’ names found in the specified bucket and prefix.
- Return type:
list
- impresso_essentials.io.s3.provider_in_path(s3_path=None, bucket=None, prefix='') bool
Determines whether the given S3 path or prefix includes a provider-level directory structure.
- Parameters:
s3_path (str, optional) – Full S3 URI (e.g., ‘s3://my-bucket/prefix/’). If provided, it overrides bucket and prefix.
bucket (str, optional) – S3 bucket name. Required if s3_path is not given.
prefix (str, optional) – Prefix (folder path) within the bucket. Ignored if s3_path is provided. Defaults to ‘’.
- Returns:
True if all immediate subfolders match known providers.
False if all match known media aliases.
- Return type:
bool
- Raises:
AttributeError –
If neither s3_path nor bucket is provided. - If a mix of provider and alias directories is found.
- impresso_essentials.io.s3.read_jsonlines(key_name: str, bucket_name: str) Generator
Given the S3 key of a jsonl.bz2 archive, extract and return its lines.
Usage example: >>> lines = db.from_sequence(read_jsonlines(s3r, key_name , bucket_name)) >>> lines.map(json.loads).pluck(‘id’).take(10)
- Parameters:
key_name (str) – S3 key, without S3 prefix, but with partitions within.
bucket_name (str) – Name of S3 bucket to use.
- Raises:
ValueError – The provided key_name does not exist in the provided bucket.
- Yields:
Generator – generator yielding lines within the archive one by one.
- impresso_essentials.io.s3.read_s3_issues(alias: str, year: str, input_bucket: str, provider: str | None = None, incl_provider: bool = True) list[tuple[IssueDir, dict]]
Read the contents of canonical issues from a given S3 bucket.
By default, it’s considered that the bucket includes the provider in its organization. If it’s not the case, the parameter incl_provider=False should be set to ensure it’s in the constructed s3 path. If the provider is not provided, it will be deduced from the alias. The provider will however be returned within the IssueDir object anyways.
- Parameters:
alias (str) – Alias of the media title to read the issues from.
year (str) – Target year to tread issues from.
input_bucket (str) – Bucket from where to fetch the issues.
provider (str|None, optional) – Provider for the given alias. Defaults to None.
incl_provider (bool, optional) – Whether to include the provider in the S3 path. Defaults to True.
- Returns:
List of IssueDirs and the issues’ contents.
- Return type:
list[tuple[IssueDir, dict]]
- impresso_essentials.io.s3.readtext_jsonlines(key_name: str, bucket_name: str, fields_to_keep: list[str] | None = None) Generator
Given the S3 key of a jsonl.bz2 archive, return its lines textual information.
Only the provided fields (or default ones) will be kept in the returned lines. By default, fields_to_keep = [“id”, “st”, “sm”, “pp”, “rr”, “ts”, “lg”, “tp”, “title”, “ft”]
This can serve as the starting point for pure textual processing. Usage example: >>> lines = db.from_sequence(readtext_jsonlines(s3r, key_name , bucket_name)) >>> lines.map(json.loads).pluck(‘ft’).take(10)
- Parameters:
key_name (str) – S3 key, without S3 prefix, but with partitions within.
bucket_name (str) – Name of S3 bucket to use.
- Raises:
ValueError – The provided key_name does not exist in the provided bucket.
- Yields:
Generator – generator yielding reformated lines within the archive one by one.
- impresso_essentials.io.s3.s3_glob_with_size(path: str, boto3_bucket=None) list[tuple]
Custom glob function to list S3 objects matching a pattern. This function works around the 1000-object listing limit in S3 by using boto3 directly.
- Parameters:
path (str) – The S3 path with a wildcard (*) to match files. Example: s3://bucket_name/path/to/files/*.txt.
boto3_bucket (boto3.Bucket, optional) – An optional boto3 Bucket object. If not provided, it will be created from the path.
- Returns:
- A list of tuples containing the full S3 paths of matching files
and their sizes in megabytes.
- Return type:
list
- impresso_essentials.io.s3.s3_iter_bucket(bucket_name: str, prefix: str = '', suffix: str = '', accept_key: ~typing.Callable[[str], bool] = <function <lambda>>) list
Iterate over a bucket, returning all keys with some filtering options.
Note that prefix should now include the provider if it’s the case in the bucket.
>>> k = s3_iter_bucket("myBucket", prefix='SNL', suffix=".bz2") >>> k = s3_iter_bucket("myBucket", prefix='SNL/GDL', accept_key=lambda x: "page" in x) >>> k = s3_iter_bucket("myBucket", accept_key=lambda x: "/GDL-" in x)
Note
If suffix is not “”, the used accepting condition will become: lambda key: accept_key(key) and key.endswith(suffix)
- Parameters:
bucket_name (str) – Name of the S3 bucket to list the contents of
prefix (str, optional) – Partition prefix to filter bucket’s keys. Defaults to “”.
suffix (str, optional) – Suffix to filter the bucket’s keys. Defaults to “”.
accept_key (Callable[[str], bool], optional) – Filtering condition for the keys as a lambda function. Defaults to lambda k: True.
- Returns:
List of keys corresponding ot the provided prefix, suffix and accept key.
- Return type:
list
- impresso_essentials.io.s3.upload_to_s3(local_path: str, path_within_bucket: str, bucket_name: str) bool
Upload a file to an S3 bucket.
- Parameters:
local_path (str) – The local file path to upload.
path_within_bucket (str) – The path within the bucket where the file will be uploaded.
bucket_name (str) – The name of the S3 bucket (without any partitions).
- Returns:
True if the upload is successful, False otherwise.
- Return type:
bool
I/O from and to file system
Code for parsing impresso’s canonical directory structures.
- impresso_essentials.io.fs_utils.canonical_path(issuedir: IssueDir, suffix: str = None, extension: str = None, as_dir: bool = False, incl_provider: bool = True) str
Create a canonical dir, filename or ID from an IssueDir object.
- Parameters:
issuedir (IssueDir) – IssueDir object to create the path for
suffix (str, optional) – Suffix to use which will follow the issue ID. eg. Can be ‘pages’, ‘i0001’ or “p” + str(num).zfill(4). Defaults to None.
extension (str, optional) – File extension to use if creating a filename. Defaults to None.
as_dir (bool, optional) – Whether the result is a directory (‘/’ separator) or a filename or ID (‘-’ separator). Defaults to False.
incl_provider (bool, optional) – Whether to include the provider when as_dir=True. Defaults to True.
- Returns:
Constructed canonical ID, filename or canonical path for given IssueDir.
- Return type:
str
- impresso_essentials.io.fs_utils.check_filenaming(file_basename: str, object_type: str = 'issue') Match[str] | None
Check whether a file’s basename complies with the naming convention.
- Parameters:
file_basename (str) – Basename of file to check (excluding extension).
object_type (str, optional) – Type of objects in the given file. One of “issue”, “page”, “audio”, “rebuilt”. Defaults to ‘issue’.
- Returns:
The resulting match if correct, None otherwise.
- Return type:
Match[str] | None
- impresso_essentials.io.fs_utils.check_id(canonical_id: str, object_type: str = 'issue') Match[str] | None
Check whether a canonical ID complies with the naming convention.
- Parameters:
canonical_id (str) – Canonical ID to check.
object_type (str, optional) – Object it corresponds to. One of “issue”, “page”, “audio”, “content-item”. Defaults to ‘issue’.
- Returns:
The resulting match if correct, None otherwise
- Return type:
Match[str] | None
- impresso_essentials.io.fs_utils.get_issueshortpath(issuedir: IssueDir, incl_provider: bool = True) str
Return short version of an IssueDir’s path, starting from the media alias.
If incl_provider, beware that the provider needs to be
- Parameters:
issuedir (IssueDir) – IssueDir instance from which to get the short path.
incl_provider (bool, optional) – whether to include the provider. Defaults to True.
- Returns:
Canonical path to the issue starting at the media alias.
- Return type:
str
- impresso_essentials.io.fs_utils.glob_with_size(directory: str, file_suffix: str) list[str]
List all files in a directory with a given suffix and their size in MB.
- Parameters:
directory (str) – The directory path to search for files.
file_suffix (str) – The file extension or suffix to match.
- Returns:
- A list of tuples, each containing the file path and its
size in megabytes, rounded to six decimal places.
- Return type:
list[str]
- impresso_essentials.io.fs_utils.list_local_directories(path: str) list[str]
List the directories present at a local path.
- Parameters:
path (str) – Local path from which to list the directories.
- Returns:
List of subdirectories in path.
- Return type:
list[str]
- impresso_essentials.io.fs_utils.parse_canonical_filename(filename: str) tuple[str, tuple, str, str, int, str]
Parse a canonical page/audio or CI ID or filename into its components.
>>> filename = "GDL-1950-01-02-a-i0002" >>> parse_canonical_filename(filename) >>> ('GDL', ('1950', '01', '02'), 'a', 'i', 2, '')
The second-to-last element is the “filetype”, and can have 3 values if defined: - i : the element is a content-item - p : the element is a page - r : the element is an audio record
- Parameters:
filename (str) – ID or filename to parse.
- Returns:
Parsed ID or filename.
- Return type:
tuple[str, tuple, str, str, int, str]
- impresso_essentials.io.fs_utils.parse_json(filename: str) dict[str, Any]
Load the contents of a JSON file.
- Parameters:
filename (str) – Path to the json file.
- Returns:
Resulting json, contained inside the file
- Return type:
dict[str, Any]
Deleting keys from S3
Simple CLI script to delete keys from S3.
- Usage:
impresso_commons/utils/s3_delete.py –bucket=<b> –prefix=<p>
- Options:
- --bucket=<b>
Target S3 bucket
- --prefix=<p>
Prefix of keys to delete
- impresso_essentials.io.s3_delete.delete_versioned_keys(client: BaseClient, bucket: str, prefix: str, max_keys: int = 1000)
Delete all the keys within a bucket based on a given prefix.
- Parameters:
client (BaseClient) – S3 client.
bucket (str) – Name of the bucket to delete keys from.
prefix (str) – Prefix to the partition from which to delete keys.
max_keys (int, optional) – Max number of keys to delete at once. Defaults to 1000.
- impresso_essentials.io.s3_delete.main()
Setting timestamp metadata on S3 files
This script processes a .jsonl file stored in an S3 bucket to extract the latest timestamp from a specified key in the file’s records. It then updates the S3 object’s metadata with the extracted timestamp. Optionally, the updated metadata can be written to a new S3 location.
- Supported Timestamp Formats:
- For ‘ts’ and ‘timestamp’ keys:
2024-04-05T18:14:47Z (UTC with Z suffix)
2024-04-05T18:14:47 (no timezone info, treated as UTC)
2024-04-05T18:14:47+00:00 (UTC with timezone offset)
2024-04-05T18:14:47+02:00 (any timezone offset, converted to UTC)
- For ‘cdt’ key:
2024-04-05 18:14:47 (space-separated format, treated as UTC)
If no valid timestamp is found in the records, the S3 object’s last modified time is used as a fallback.
- Usage:
python s3_set_timestamp.py –s3-file s3://bucket/path/file.jsonl.bz2 –metadata-key impresso-last-ts –ts-key ts –all-lines –output s3://bucket/path/output.jsonl
python s3_set_timestamp.py –s3-prefix s3://bucket/path/ –metadata-key impresso-last-ts –ts-key ts –all-lines
- param –s3-prefix:
The S3 prefix to process multiple .jsonl.bz2 files.
- param –s3-file:
The S3 URI of a single .jsonl.bz2 file to process.
- param –metadata-key:
The metadata key to update with the latest timestamp (default: impresso-last-ts).
- param –ts-key:
The key in the JSONL records to extract the timestamp from (default: ts). Choices: ts, cdt, timestamp.
- param –all-lines:
If False, only the first timestamp is considered.
- param –output:
Optional S3 URI for the output file with updated metadata (only for –s3-file).
- param –force:
Force reprocessing even if metadata is already up-to-date (default: False).
- impresso_essentials.io.s3_set_timestamp.compute_statistics(skipped: int, processed: int)
Compute and log overall statistics for the files processed.
- Parameters:
skipped (int) – Number of files skipped.
processed (int) – Number of files processed.
- Returns:
None
- impresso_essentials.io.s3_set_timestamp.disable_interrupts()
Context manager to temporarily disable keyboard interrupts.
- impresso_essentials.io.s3_set_timestamp.get_last_timestamp(fileobj, ts_key: str, all_lines: bool, fallback_timestamp: str = None) str
Extracts the latest timestamp from a .jsonl file based on the specified key.
- Parameters:
fileobj – The file object or path to the .jsonl file (supports .bz2 compression).
ts_key – The key in the JSONL records to extract the timestamp from.
all_lines – If False, only the first timestamp is considered.
fallback_timestamp – Fallback timestamp to use if no valid timestamp is found in records.
- Returns:
The latest timestamp in ISO 8601 format (e.g., ‘2023-01-01T12:00:00Z’).
- Return type:
str
- Raises:
ValueError – If no valid timestamp is found or the key format is unknown.
- impresso_essentials.io.s3_set_timestamp.get_s3_client() client
Creates and returns a boto3 S3 client configured with credentials and endpoint.
- The client is configured using environment variables:
SE_ACCESS_KEY: AWS access key ID.
SE_SECRET_KEY: AWS secret access key.
SE_HOST_URL: S3 endpoint URL (default: https://os.zhdk.cloud.switch.ch/).
SE_REGION: AWS region (default: us-east-1).
- Returns:
A configured S3 client instance.
- Return type:
boto3.client
- impresso_essentials.io.s3_set_timestamp.main()
Parses command-line arguments and triggers the metadata update process.
- This function handles the following arguments:
–s3-prefix: The S3 prefix to process multiple .jsonl.bz2 files.
–s3-file: The S3 URI of a single .jsonl.bz2 file to process.
–metadata-key: The metadata key to update with the latest timestamp.
–ts-key: The key in the JSONL records to extract the timestamp from.
–all-lines: If False, only the first timestamp is considered.
–output: Optional S3 URI for the output file with updated metadata. Only valid with –s3-file.
–force: Force reprocessing even if metadata is already up-to-date.
–report: Report all files missing the specified metadata key.
–report-dirs: Report all directories containing files missing the specified metadata key.
- Returns:
None
- impresso_essentials.io.s3_set_timestamp.report_missing_metadata(s3_prefix: str, metadata_key: str)
Reports all S3 objects matching a given prefix that are missing the specified metadata key.
- Parameters:
s3_prefix – The S3 prefix to search for .jsonl.bz2 files.
metadata_key – The metadata key to check for.
- Returns:
None
- impresso_essentials.io.s3_set_timestamp.report_missing_metadata_dirs(s3_prefix: str, metadata_key: str)
Reports all directories matching a given prefix that contain .jsonl.bz2 files missing the specified metadata key.
- Parameters:
s3_prefix – The S3 prefix to search for .jsonl.bz2 files.
metadata_key – The metadata key to check for.
- Returns:
None
- impresso_essentials.io.s3_set_timestamp.update_metadata_for_prefix(s3_prefix: str, metadata_key: str, ts_key: str, all_lines: bool, force: bool = False)
Updates the metadata for all S3 objects matching a given prefix.
- Parameters:
s3_prefix – The S3 prefix to search for .jsonl.bz2 files.
metadata_key – The metadata key to update with the latest timestamp.
ts_key – The key in the JSONL records to extract the timestamp from.
all_lines – If False, only the first timestamp is considered.
force – Force reprocessing even if metadata is already up-to-date.
- Returns:
None
- Raises:
ValueError – If the prefix does not match any files.
- impresso_essentials.io.s3_set_timestamp.update_metadata_if_needed(s3_uri: str, metadata_key: str, ts_key: str, all_lines: bool, output_s3_uri: str = None, force: bool = False)
Updates the metadata of an S3 object with the latest timestamp from a .jsonl file.
- Parameters:
s3_uri – The S3 URI of the .jsonl file to process.
metadata_key – The metadata key to update with the latest timestamp.
ts_key – The key in the JSONL records to extract the timestamp from.
all_lines – If False, only the first timestamp is considered.
output_s3_uri – Optional S3 URI for the output file with updated metadata.
force – Force reprocessing even if metadata is already up-to-date.
- Returns:
None
- Raises:
ValueError – If the timestamp extraction or metadata update fails.
Adding the provider level to S3 partitions
Command-line script to generate a manifest for an S3 bucket or partition after a processing.
- Usage:
s3_add_provider.py –s3-partition-path=<pp> –log-file=<lf> [–dest-bucket=<db> –remove-src-keys –no-copy –verbose]
Options:
- --s3-partition-path=<pp>
S3 path to the partition to which the provider level should be added. Corresponds to the last partition before the list of media titles - where to add the provider. Eg. “”s3://122-rebuilt-final” or “s3://142-processed-data-final/langident/langident_v1-4-4”
- --log-file=<lf>
Path to log file to use.
- --dest-bucket=<db>
Destination bucket in which to copy the data with the provider layer. If not defined, will default to the input bucket (corresponding to simply adding the provider). Eg. “122-rebuilt-staging”.
- --remove-src-keys
Whether to remove the source keys which don’t have the provider after performing the addition and/or copy. If True, the old keys without the provider will be removed. Defaults to False.
- --no-copy
Launch the scrip in debug mode - will not perform copies but list which would have been done
- --verbose
Set logging level to DEBUG (by default is INFO).
- impresso_essentials.io.s3_add_provider.add_provider_to_s3_partition(src_bucket: str, dest_bucket: str, exact_partition: str, perform_copy=False, remove_src_keys=False, metadata_directive='COPY') None
Add a provider-level directory to an S3 partition by restructuring file paths.
Iterate over all .jsonl.bz2 files under the given S3 partition and rewrite their keys to include the provider name as a directory level. Optionally copy the files to a destination bucket using the new structure, and/or also delete the original keys.
- Parameters:
src_bucket (str) – Name of the source S3 bucket.
dest_bucket (str) – Name of the destination S3 bucket.
exact_partition (str) – The key prefix representing the S3 partition to process.
perform_copy (bool, optional) – If True, files are copied to the new key structure. If False, only logs the intended actions. Defaults to False.
remove_src_keys (bool, optional) – If True and perform_copy is enabled, removes the source keys after copying (except those with “pages” in the key). Defaults to False.
metadata_directive (str, optional) – Directive passed to s3.copy_object, usually “COPY” or “REPLACE”. Defaults to “COPY”.
- Raises:
AttributeError – If alias or provider cannot be inferred from a file path.
- impresso_essentials.io.s3_add_provider.construct_dest_key(src_key: str, provider: str, og_partition: str, current_alias: str | None = None, found_prov: str | None = None) str
Construct a destination S3 key by inserting the provider into the path structure.
Modify the original source key by adding the provider name at the correct position, depending on whether a partition is defined. If a found_prov is given, the key is returned unchanged, assuming the provider is already present.
- Parameters:
src_key (str) – The original source key (e.g., a file path in S3).
provider (str) – The name of the provider to inject into the key path.
og_partition (str) – The original partition prefix.
current_alias (str | None, optional) – The media alias currently being processed. Used only for logging/debugging. Defaults to None.
found_prov (str | None, optional) – If provided, assumes the provider is already in the path and skips modifying the key. Defaults to None.
- Returns:
The modified destination key with the provider inserted, or the original key if found_prov is set.
- Return type:
str
- impresso_essentials.io.s3_add_provider.get_alias_from_path(source_path: str, og_partition: str) tuple[str, str | None]
Extract the media alias (and optionally the provider) from a given S3 source path.
Based on the original partition prefix, determine the position of the media alias and whether the provider is included in the path.
The logic distinguishes between: - paths where the alias is directly in the root (e.g., alias/…) - paths that include a provider level (e.g., provider/alias/…)
- Parameters:
source_path (str) – Full S3-like path to a file or folder.
og_partition (str) – The partition/prefix that was originally used (e.g., “stage/”).
- Returns:
- A tuple of:
The media alias string.
The provider name if found, otherwise None.
- Return type:
tuple[str, str | None]
- Raises:
AttributeError – If the alias cannot be identified from the path.
- impresso_essentials.io.s3_add_provider.main()