Input/Output

I/O from and to S3

Reusable functions to read/write data from/to our S3 drive.

impresso_essentials.io.s3.alternative_read_text(s3_key: str, s3_credentials: dict, line_by_line: bool = True) list[str] | str

Read from S3 a line-separated text file (e.g. *.jsonl.bz2).

Note:

The reason for this function is a bug in dask.bag.read_text() which breaks on buckets having >= 1000 keys. It raises a FileNotFoundError.

Parameters:
  • s3_key (str) – Full S3 path to the file to read.

  • s3_credentials (dict) – S3 credentials, IMPRESSO_STORAGEOPT.

  • line_by_line (bool, optional) – Whether to read the file line by line. Defaults to True.

Returns:

Contents of the file, as a list of strings or as one string.

Return type:

list[str] | str

impresso_essentials.io.s3.extract_provider_alias_key(s3_key: str, bucket: str, prov_included: bool = True) tuple[str, str]

Extract the media alias an s3:key corresponds to given the bucket and partition

eg. s3_key is in format: - s3_key: ‘s3://31-passim-rebuilt-staging/passim/[provider]/[alias]/[alias]-[year].jsonl.bz2’ - bucket: ‘31-passim-rebuilt-staging/passim’ - prov_included: True –> returns (provider, alias)

Parameters:
  • s3_key (str) – Full S3 path of a file (as returned by fixed_s3fs_glob).

  • bucket (str) – S3 bucket, including partition, in which the media dirs are.

  • prov_included (bool, optional) – Whether or not the provider level is present in the structure of the provided bucket. Defaults to True.

Returns:

Media alias of the corresponding media, and corresponding provider.

Return type:

tuple[str, str]

impresso_essentials.io.s3.fetch_files(bucket_name: str, compute: bool = True, file_type: str = 'issues', providers_filter: list[str] | None = None, aliases_filter: list[str] | None = None) tuple[Bag | None, Bag | None] | tuple[list[str] | None, list[str] | None]

Fetch issue and/or page canonical JSON files from an s3 bucket.

If compute=True, the output will be a list of the contents of all files in the bucket for the specified newspapers and type of files. If compute=False, the output will remain in a distributed dask.bag.

For the file type, the possible values are the following: - ‘issues’, ‘pages’, ‘audios’: include only bz2 files of the given type. - ‘supports’: include all pages and audios bz2 files, returned in element [1] of the tuple. - ‘both’: include all types of files, with issues ([0]) and supports -pages and audios- ([1]).

Based on file_type, the issue files, page/audio (“support”) files or both will be returned. In the returned tuple, issues are always in the first element and supports in the second, hence if file_type is not ‘both’, the tuple entry corresponding to the undesired type of files will be None.

Parameters:
  • bucket_name (str) – Name of the s3 bucket to fetch the files form

  • compute (bool, optional) – Whether to compute result and output as list. Defaults to True.

  • file_type – (str, optional): Type of files to list, possible values are “issues”, “pages”, “audios”, “supports” and “both”. Defaults to “issues”.

  • providers_filter (list[str] | None, optional) – List of providers for which to consider the aliases. If None, aliases_filter will be considered. Defaults to None.

  • aliases_filter (list[str] | None, optional) – List of aliases to consider. If None, all will be considered. Defaults to None.

Raises:

NotImplementedError – The given file_type is not one of [‘issues’, ‘pages’, ‘audios’, ‘support’, ‘both’].

Returns:

[0] Issue files’ contents or None and [1] Page and Audio Record files’ contents or None based on file_type

Return type:

tuple[db.core.Bag|None, db.core.Bag|None] | tuple[list[str]|None, list[str]|None]

impresso_essentials.io.s3.fixed_s3fs_glob(path: str, suffix: str | None = None, boto3_bucket=None) list[str]

Custom glob function able to list more than 1000 elements on s3 (fix of s3fs).

Note

path should be of the form “[partition]*[suffix or file extensions]”, with the partition potentially including the bucket name. If all files within the partitions should be considered, regardeless of their extension, “*” can be omitted. Conversely, path can be of the form “[partition]” if suffix is defined.

Parameters:
  • path (str) – Glob path to the files, optionally including the bucket name. If the bucket name is not included, boto3_bucket should be defined.

  • suffix (str | None, optional) – Suffix or extension of the paths to consider within the bucket. Only used if “*” not found in path. Defaults to None.

  • boto3_bucket (boto3.resources.factory.s3.Bucket, optional) – S3 bucket to look into. Defaults to None.

Returns:

List of filenames within the bucket corresponding to the provided path.

Return type:

list[str]

impresso_essentials.io.s3.get_bucket(bucket_name: str)

Create a boto3 connection and return the desired bucket.

Note

This function does not ensure that the bucket exists. If this verification is necessary, please prefer using get_or_create_bucket() instead.

Parameters:

bucket_name (str) – Name of the S3 bucket to use.

Returns:

Desired S3 bucket.

Return type:

boto3.resources.factory.s3.Bucket

impresso_essentials.io.s3.get_or_create_bucket(name: str, create: bool = False)

Create a boto3 s3 connection and create or return the requested bucket.

It is possible to ask for creating a new bucket with the specified name (in case it does not exist): >>> b = get_bucket(‘testb’, create=False) >>> b = get_bucket(‘testb’, create=True)

Parameters:
  • name (str) – Name of thebucket to get of create.

  • create (bool, optional) – Whether to create the bucket if it doesn’t exist. Defaults to False.

Returns:

S3 bucket, fetched or created.

Return type:

boto3.resources.factory.s3.Bucket

impresso_essentials.io.s3.get_s3_client(host_url: str | None = 'https://os.zhdk.cloud.switch.ch/') BaseClient

Create S3 boto3 client using environment variables from local .env files.

Assumes that two environment variables are set: SE_ACCESS_KEY and SE_SECRET_KEY.

Parameters:

host_url (str | None, optional) – _description_. Defaults to “https://os.zhdk.cloud.switch.ch/”.

Raises:
  • e – Argument host_url was not provided and SE_HOST_URL was not in the env.

  • eSE_ACCESS_KEY or SE_SECRET_KEY was not in the environment variables.

Returns:

The S3 boto3 client.

Return type:

BaseClient

impresso_essentials.io.s3.get_s3_object_size(bucket_name: str, key: str) int

Get the size of an object (key) in an S3 bucket.

Parameters:
  • bucket_name (str) – The name of the S3 bucket.

  • key (str) – The key (object) whose size you want to retrieve.

Returns:

The size of the object in bytes, or None if the object doesn’t exist.

Return type:

int

impresso_essentials.io.s3.get_s3_resource(host_url: str | None = 'https://os.zhdk.cloud.switch.ch/') ServiceResource

Get a boto3 resource object related to an S3 drive.

Assumes that two environment variables are set: SE_ACCESS_KEY and SE_SECRET_KEY.

Parameters:

host_url (str | None, optional) – _description_. Defaults to “https://os.zhdk.cloud.switch.ch/”.

Raises:
  • e – Argument host_url was not provided and SE_HOST_URL was not in the env.

  • eSE_ACCESS_KEY or SE_SECRET_KEY was not in the environment variables.

Returns:

S3 resource associated to the endpoint.

Return type:

ServiceResource

impresso_essentials.io.s3.get_storage_options() dict[str, dict | str]

Load environment variables from local .env files

Assumes that two environment variables are set: SE_ACCESS_KEY and SE_SECRET_KEY.

Returns:

Credentials to access a S3 endpoint.

Return type:

dict[str, dict | str]

impresso_essentials.io.s3.list_canonical_files(bucket_name: str, file_type: str = 'issues', providers_filter: list[str] | None = None, aliases_filter: list[str] | None = None) tuple[list[str] | None, list[str] | None]

List the canonical files located in a given S3 bucket.

Note

The filters are applied in a hierchical manner; first at provider level, then at alias level if no provider filter was given.

Note

For the file type, the possible values are the following: - “issues”, “pages”, “audios”: include only bz2 files of the given type. - “supports”: include all pages and audios bz2 files, returned [1] element of the tuple. - “both”: include all types of files, with issues ([0]) and supports ([1]).

Parameters:
  • bucket_name (str) – S3 bucket name.

  • file_type (str, optional) – Type of files to list, possible values are “issues”, “pages”, “audios”, “supports” and “both”. Defaults to “issues”.

  • providers_filter (list[str] | None, optional) – List of providers for which to consider the aliases. If None, aliases_filter will be considered. Defaults to None.

  • aliases_filter (list[str] | None, optional) – List of aliases to consider. If None, all will be considered. Defaults to None.

Raises:

NotImplementedError – The given file_type is not one of [‘issues’, ‘pages’, ‘audios’, ‘supports’, ‘both’].

Returns:

[0] List of issue files or None and

[1] List of page/audio files or None based on file_type

Return type:

tuple[list[str] | None, list[str] | None]

impresso_essentials.io.s3.list_media_titles(bucket_name: str, s3_client=<botocore.client.S3 object>, page_size: int = 10000, prov_included: bool = True) list[str]

List media titles contained in an s3 bucket with impresso data.

By default, it is considered that the bucket contains the provided level.

Note

25,000 seems to be the maximum PageSize value supported by SwitchEngines’ S3 implementation (ceph).

Parameters:
  • bucket_name (str) – Name of the S3 bucket to consider

  • s3_client (optional) – S3 client to use. Defaults to get_s3_client().

  • page_size (int, optional) – Pagination configuration. Defaults to 10000.

  • prov_included (bool, optional) – Whether the provider level is included in the bucket structure. Defaults to True

Returns:

Sorted list of media titles (aliases) present in the given S3 bucket.

Return type:

list[str]

impresso_essentials.io.s3.list_providers_and_aliases(bucket_name: str, prefix: str = '') list[str]

Lists providers and their alias directories from an S3 bucket.

Traverses the given S3 bucket to identify provider directories and their associated alias subdirectories, under an optional prefix. Returns a dictionary mapping each provider to a sorted list of aliases.

Parameters:
  • bucket_name (str) – The name of the S3 bucket to query.

  • prefix (str, optional) – The prefix path within the bucket to search (should have a tailing /). Defaults to the root (‘’).

Returns:

Dict mapping sorted provider names to lists of alias directory names.

Return type:

dict[str, list[str]]

Raises:

botocore.exceptions.ClientError – If there is an issue communicating with S3.

Example

>>> list_providers_and_aliases('141-processed-data-staging', 'embeddings/images/embeddings_dinov2_v1-0-0/')
{
    'BCUL': ['ACI', 'AV', 'Bombe', 'CL', ..., 'esta', 'ouistiti'],
    ...
    'SNL': ['BDC', 'CDV', ..., 'WHD', 'ZBT']
}
impresso_essentials.io.s3.list_s3_directories(bucket_name: str, prefix: str = '') list[str]

Retrieve ‘directory’ names in an S3 bucket given a path prefix.

Depending on the prefix and the specific bucket, this will either list the media providers or the media aliases. For s3 partitions which have the partner layer, the prefix parameter allows to select a specific partner and list its associated media aliases.

Parameters:
  • bucket_name (str) – The name of the S3 bucket.

  • prefix (str) – The prefix path within the bucket to search (should include a tailing /). Defaults to the root (‘’).

Returns:

A list of ‘directory’ names found in the specified bucket and prefix.

Return type:

list

impresso_essentials.io.s3.provider_in_path(s3_path=None, bucket=None, prefix='') bool

Determines whether the given S3 path or prefix includes a provider-level directory structure.

Parameters:
  • s3_path (str, optional) – Full S3 URI (e.g., ‘s3://my-bucket/prefix/’). If provided, it overrides bucket and prefix.

  • bucket (str, optional) – S3 bucket name. Required if s3_path is not given.

  • prefix (str, optional) – Prefix (folder path) within the bucket. Ignored if s3_path is provided. Defaults to ‘’.

Returns:

  • True if all immediate subfolders match known providers.

  • False if all match known media aliases.

Return type:

bool

Raises:

AttributeError

  • If neither s3_path nor bucket is provided. - If a mix of provider and alias directories is found.

impresso_essentials.io.s3.read_jsonlines(key_name: str, bucket_name: str) Generator

Given the S3 key of a jsonl.bz2 archive, extract and return its lines.

Usage example: >>> lines = db.from_sequence(read_jsonlines(s3r, key_name , bucket_name)) >>> lines.map(json.loads).pluck(‘id’).take(10)

Parameters:
  • key_name (str) – S3 key, without S3 prefix, but with partitions within.

  • bucket_name (str) – Name of S3 bucket to use.

Raises:

ValueError – The provided key_name does not exist in the provided bucket.

Yields:

Generator – generator yielding lines within the archive one by one.

impresso_essentials.io.s3.read_s3_issues(alias: str, year: str, input_bucket: str, provider: str | None = None, incl_provider: bool = True) list[tuple[IssueDir, dict]]

Read the contents of canonical issues from a given S3 bucket.

By default, it’s considered that the bucket includes the provider in its organization. If it’s not the case, the parameter incl_provider=False should be set to ensure it’s in the constructed s3 path. If the provider is not provided, it will be deduced from the alias. The provider will however be returned within the IssueDir object anyways.

Parameters:
  • alias (str) – Alias of the media title to read the issues from.

  • year (str) – Target year to tread issues from.

  • input_bucket (str) – Bucket from where to fetch the issues.

  • provider (str|None, optional) – Provider for the given alias. Defaults to None.

  • incl_provider (bool, optional) – Whether to include the provider in the S3 path. Defaults to True.

Returns:

List of IssueDirs and the issues’ contents.

Return type:

list[tuple[IssueDir, dict]]

impresso_essentials.io.s3.readtext_jsonlines(key_name: str, bucket_name: str, fields_to_keep: list[str] | None = None) Generator

Given the S3 key of a jsonl.bz2 archive, return its lines textual information.

Only the provided fields (or default ones) will be kept in the returned lines. By default, fields_to_keep = [“id”, “st”, “sm”, “pp”, “rr”, “ts”, “lg”, “tp”, “title”, “ft”]

This can serve as the starting point for pure textual processing. Usage example: >>> lines = db.from_sequence(readtext_jsonlines(s3r, key_name , bucket_name)) >>> lines.map(json.loads).pluck(‘ft’).take(10)

Parameters:
  • key_name (str) – S3 key, without S3 prefix, but with partitions within.

  • bucket_name (str) – Name of S3 bucket to use.

Raises:

ValueError – The provided key_name does not exist in the provided bucket.

Yields:

Generator – generator yielding reformated lines within the archive one by one.

impresso_essentials.io.s3.s3_glob_with_size(path: str, boto3_bucket=None) list[tuple]

Custom glob function to list S3 objects matching a pattern. This function works around the 1000-object listing limit in S3 by using boto3 directly.

Parameters:
  • path (str) – The S3 path with a wildcard (*) to match files. Example: s3://bucket_name/path/to/files/*.txt.

  • boto3_bucket (boto3.Bucket, optional) – An optional boto3 Bucket object. If not provided, it will be created from the path.

Returns:

A list of tuples containing the full S3 paths of matching files

and their sizes in megabytes.

Return type:

list

impresso_essentials.io.s3.s3_iter_bucket(bucket_name: str, prefix: str = '', suffix: str = '', accept_key: ~typing.Callable[[str], bool] = <function <lambda>>) list

Iterate over a bucket, returning all keys with some filtering options.

Note that prefix should now include the provider if it’s the case in the bucket.

>>> k = s3_iter_bucket("myBucket", prefix='SNL', suffix=".bz2")
>>> k = s3_iter_bucket("myBucket", prefix='SNL/GDL', accept_key=lambda x: "page" in x)
>>> k = s3_iter_bucket("myBucket",  accept_key=lambda x: "/GDL-" in x)

Note

If suffix is not “”, the used accepting condition will become: lambda key: accept_key(key) and key.endswith(suffix)

Parameters:
  • bucket_name (str) – Name of the S3 bucket to list the contents of

  • prefix (str, optional) – Partition prefix to filter bucket’s keys. Defaults to “”.

  • suffix (str, optional) – Suffix to filter the bucket’s keys. Defaults to “”.

  • accept_key (Callable[[str], bool], optional) – Filtering condition for the keys as a lambda function. Defaults to lambda k: True.

Returns:

List of keys corresponding ot the provided prefix, suffix and accept key.

Return type:

list

impresso_essentials.io.s3.upload_to_s3(local_path: str, path_within_bucket: str, bucket_name: str) bool

Upload a file to an S3 bucket.

Parameters:
  • local_path (str) – The local file path to upload.

  • path_within_bucket (str) – The path within the bucket where the file will be uploaded.

  • bucket_name (str) – The name of the S3 bucket (without any partitions).

Returns:

True if the upload is successful, False otherwise.

Return type:

bool

I/O from and to file system

Code for parsing impresso’s canonical directory structures.

impresso_essentials.io.fs_utils.canonical_path(issuedir: IssueDir, suffix: str = None, extension: str = None, as_dir: bool = False, incl_provider: bool = True) str

Create a canonical dir, filename or ID from an IssueDir object.

Parameters:
  • issuedir (IssueDir) – IssueDir object to create the path for

  • suffix (str, optional) – Suffix to use which will follow the issue ID. eg. Can be ‘pages’, ‘i0001’ or “p” + str(num).zfill(4). Defaults to None.

  • extension (str, optional) – File extension to use if creating a filename. Defaults to None.

  • as_dir (bool, optional) – Whether the result is a directory (‘/’ separator) or a filename or ID (‘-’ separator). Defaults to False.

  • incl_provider (bool, optional) – Whether to include the provider when as_dir=True. Defaults to True.

Returns:

Constructed canonical ID, filename or canonical path for given IssueDir.

Return type:

str

impresso_essentials.io.fs_utils.check_filenaming(file_basename: str, object_type: str = 'issue') Match[str] | None

Check whether a file’s basename complies with the naming convention.

Parameters:
  • file_basename (str) – Basename of file to check (excluding extension).

  • object_type (str, optional) – Type of objects in the given file. One of “issue”, “page”, “audio”, “rebuilt”. Defaults to ‘issue’.

Returns:

The resulting match if correct, None otherwise.

Return type:

Match[str] | None

impresso_essentials.io.fs_utils.check_id(canonical_id: str, object_type: str = 'issue') Match[str] | None

Check whether a canonical ID complies with the naming convention.

Parameters:
  • canonical_id (str) – Canonical ID to check.

  • object_type (str, optional) – Object it corresponds to. One of “issue”, “page”, “audio”, “content-item”. Defaults to ‘issue’.

Returns:

The resulting match if correct, None otherwise

Return type:

Match[str] | None

impresso_essentials.io.fs_utils.get_issueshortpath(issuedir: IssueDir, incl_provider: bool = True) str

Return short version of an IssueDir’s path, starting from the media alias.

If incl_provider, beware that the provider needs to be

Parameters:
  • issuedir (IssueDir) – IssueDir instance from which to get the short path.

  • incl_provider (bool, optional) – whether to include the provider. Defaults to True.

Returns:

Canonical path to the issue starting at the media alias.

Return type:

str

impresso_essentials.io.fs_utils.glob_with_size(directory: str, file_suffix: str) list[str]

List all files in a directory with a given suffix and their size in MB.

Parameters:
  • directory (str) – The directory path to search for files.

  • file_suffix (str) – The file extension or suffix to match.

Returns:

A list of tuples, each containing the file path and its

size in megabytes, rounded to six decimal places.

Return type:

list[str]

impresso_essentials.io.fs_utils.list_local_directories(path: str) list[str]

List the directories present at a local path.

Parameters:

path (str) – Local path from which to list the directories.

Returns:

List of subdirectories in path.

Return type:

list[str]

impresso_essentials.io.fs_utils.parse_canonical_filename(filename: str) tuple[str, tuple, str, str, int, str]

Parse a canonical page/audio or CI ID or filename into its components.

>>> filename = "GDL-1950-01-02-a-i0002"
>>> parse_canonical_filename(filename)
>>> ('GDL', ('1950', '01', '02'), 'a', 'i', 2, '')

The second-to-last element is the “filetype”, and can have 3 values if defined: - i : the element is a content-item - p : the element is a page - r : the element is an audio record

Parameters:

filename (str) – ID or filename to parse.

Returns:

Parsed ID or filename.

Return type:

tuple[str, tuple, str, str, int, str]

impresso_essentials.io.fs_utils.parse_json(filename: str) dict[str, Any]

Load the contents of a JSON file.

Parameters:

filename (str) – Path to the json file.

Returns:

Resulting json, contained inside the file

Return type:

dict[str, Any]

Deleting keys from S3

Simple CLI script to delete keys from S3.

Usage:

impresso_commons/utils/s3_delete.py –bucket=<b> –prefix=<p>

Options:
--bucket=<b>

Target S3 bucket

--prefix=<p>

Prefix of keys to delete

impresso_essentials.io.s3_delete.delete_versioned_keys(client: BaseClient, bucket: str, prefix: str, max_keys: int = 1000)

Delete all the keys within a bucket based on a given prefix.

Parameters:
  • client (BaseClient) – S3 client.

  • bucket (str) – Name of the bucket to delete keys from.

  • prefix (str) – Prefix to the partition from which to delete keys.

  • max_keys (int, optional) – Max number of keys to delete at once. Defaults to 1000.

impresso_essentials.io.s3_delete.main()

Setting timestamp metadata on S3 files

This script processes a .jsonl file stored in an S3 bucket to extract the latest timestamp from a specified key in the file’s records. It then updates the S3 object’s metadata with the extracted timestamp. Optionally, the updated metadata can be written to a new S3 location.

Supported Timestamp Formats:
For ‘ts’ and ‘timestamp’ keys:
  • 2024-04-05T18:14:47Z (UTC with Z suffix)

  • 2024-04-05T18:14:47 (no timezone info, treated as UTC)

  • 2024-04-05T18:14:47+00:00 (UTC with timezone offset)

  • 2024-04-05T18:14:47+02:00 (any timezone offset, converted to UTC)

For ‘cdt’ key:
  • 2024-04-05 18:14:47 (space-separated format, treated as UTC)

If no valid timestamp is found in the records, the S3 object’s last modified time is used as a fallback.

Usage:

python s3_set_timestamp.py –s3-file s3://bucket/path/file.jsonl.bz2 –metadata-key impresso-last-ts –ts-key ts –all-lines –output s3://bucket/path/output.jsonl

python s3_set_timestamp.py –s3-prefix s3://bucket/path/ –metadata-key impresso-last-ts –ts-key ts –all-lines

param –s3-prefix:

The S3 prefix to process multiple .jsonl.bz2 files.

param –s3-file:

The S3 URI of a single .jsonl.bz2 file to process.

param –metadata-key:

The metadata key to update with the latest timestamp (default: impresso-last-ts).

param –ts-key:

The key in the JSONL records to extract the timestamp from (default: ts). Choices: ts, cdt, timestamp.

param –all-lines:

If False, only the first timestamp is considered.

param –output:

Optional S3 URI for the output file with updated metadata (only for –s3-file).

param –force:

Force reprocessing even if metadata is already up-to-date (default: False).

impresso_essentials.io.s3_set_timestamp.compute_statistics(skipped: int, processed: int)

Compute and log overall statistics for the files processed.

Parameters:
  • skipped (int) – Number of files skipped.

  • processed (int) – Number of files processed.

Returns:

None

impresso_essentials.io.s3_set_timestamp.disable_interrupts()

Context manager to temporarily disable keyboard interrupts.

impresso_essentials.io.s3_set_timestamp.get_last_timestamp(fileobj, ts_key: str, all_lines: bool, fallback_timestamp: str = None) str

Extracts the latest timestamp from a .jsonl file based on the specified key.

Parameters:
  • fileobj – The file object or path to the .jsonl file (supports .bz2 compression).

  • ts_key – The key in the JSONL records to extract the timestamp from.

  • all_lines – If False, only the first timestamp is considered.

  • fallback_timestamp – Fallback timestamp to use if no valid timestamp is found in records.

Returns:

The latest timestamp in ISO 8601 format (e.g., ‘2023-01-01T12:00:00Z’).

Return type:

str

Raises:

ValueError – If no valid timestamp is found or the key format is unknown.

impresso_essentials.io.s3_set_timestamp.get_s3_client() client

Creates and returns a boto3 S3 client configured with credentials and endpoint.

The client is configured using environment variables:
  • SE_ACCESS_KEY: AWS access key ID.

  • SE_SECRET_KEY: AWS secret access key.

  • SE_HOST_URL: S3 endpoint URL (default: https://os.zhdk.cloud.switch.ch/).

  • SE_REGION: AWS region (default: us-east-1).

Returns:

A configured S3 client instance.

Return type:

boto3.client

impresso_essentials.io.s3_set_timestamp.main()

Parses command-line arguments and triggers the metadata update process.

This function handles the following arguments:
  • –s3-prefix: The S3 prefix to process multiple .jsonl.bz2 files.

  • –s3-file: The S3 URI of a single .jsonl.bz2 file to process.

  • –metadata-key: The metadata key to update with the latest timestamp.

  • –ts-key: The key in the JSONL records to extract the timestamp from.

  • –all-lines: If False, only the first timestamp is considered.

  • –output: Optional S3 URI for the output file with updated metadata. Only valid with –s3-file.

  • –force: Force reprocessing even if metadata is already up-to-date.

  • –report: Report all files missing the specified metadata key.

  • –report-dirs: Report all directories containing files missing the specified metadata key.

Returns:

None

impresso_essentials.io.s3_set_timestamp.report_missing_metadata(s3_prefix: str, metadata_key: str)

Reports all S3 objects matching a given prefix that are missing the specified metadata key.

Parameters:
  • s3_prefix – The S3 prefix to search for .jsonl.bz2 files.

  • metadata_key – The metadata key to check for.

Returns:

None

impresso_essentials.io.s3_set_timestamp.report_missing_metadata_dirs(s3_prefix: str, metadata_key: str)

Reports all directories matching a given prefix that contain .jsonl.bz2 files missing the specified metadata key.

Parameters:
  • s3_prefix – The S3 prefix to search for .jsonl.bz2 files.

  • metadata_key – The metadata key to check for.

Returns:

None

impresso_essentials.io.s3_set_timestamp.update_metadata_for_prefix(s3_prefix: str, metadata_key: str, ts_key: str, all_lines: bool, force: bool = False)

Updates the metadata for all S3 objects matching a given prefix.

Parameters:
  • s3_prefix – The S3 prefix to search for .jsonl.bz2 files.

  • metadata_key – The metadata key to update with the latest timestamp.

  • ts_key – The key in the JSONL records to extract the timestamp from.

  • all_lines – If False, only the first timestamp is considered.

  • force – Force reprocessing even if metadata is already up-to-date.

Returns:

None

Raises:

ValueError – If the prefix does not match any files.

impresso_essentials.io.s3_set_timestamp.update_metadata_if_needed(s3_uri: str, metadata_key: str, ts_key: str, all_lines: bool, output_s3_uri: str = None, force: bool = False)

Updates the metadata of an S3 object with the latest timestamp from a .jsonl file.

Parameters:
  • s3_uri – The S3 URI of the .jsonl file to process.

  • metadata_key – The metadata key to update with the latest timestamp.

  • ts_key – The key in the JSONL records to extract the timestamp from.

  • all_lines – If False, only the first timestamp is considered.

  • output_s3_uri – Optional S3 URI for the output file with updated metadata.

  • force – Force reprocessing even if metadata is already up-to-date.

Returns:

None

Raises:

ValueError – If the timestamp extraction or metadata update fails.

Adding the provider level to S3 partitions

Command-line script to generate a manifest for an S3 bucket or partition after a processing.

Usage:

s3_add_provider.py –s3-partition-path=<pp> –log-file=<lf> [–dest-bucket=<db> –remove-src-keys –no-copy –verbose]

Options:

--s3-partition-path=<pp>

S3 path to the partition to which the provider level should be added. Corresponds to the last partition before the list of media titles - where to add the provider. Eg. “”s3://122-rebuilt-final” or “s3://142-processed-data-final/langident/langident_v1-4-4”

--log-file=<lf>

Path to log file to use.

--dest-bucket=<db>

Destination bucket in which to copy the data with the provider layer. If not defined, will default to the input bucket (corresponding to simply adding the provider). Eg. “122-rebuilt-staging”.

--remove-src-keys

Whether to remove the source keys which don’t have the provider after performing the addition and/or copy. If True, the old keys without the provider will be removed. Defaults to False.

--no-copy

Launch the scrip in debug mode - will not perform copies but list which would have been done

--verbose

Set logging level to DEBUG (by default is INFO).

impresso_essentials.io.s3_add_provider.add_provider_to_s3_partition(src_bucket: str, dest_bucket: str, exact_partition: str, perform_copy=False, remove_src_keys=False, metadata_directive='COPY') None

Add a provider-level directory to an S3 partition by restructuring file paths.

Iterate over all .jsonl.bz2 files under the given S3 partition and rewrite their keys to include the provider name as a directory level. Optionally copy the files to a destination bucket using the new structure, and/or also delete the original keys.

Parameters:
  • src_bucket (str) – Name of the source S3 bucket.

  • dest_bucket (str) – Name of the destination S3 bucket.

  • exact_partition (str) – The key prefix representing the S3 partition to process.

  • perform_copy (bool, optional) – If True, files are copied to the new key structure. If False, only logs the intended actions. Defaults to False.

  • remove_src_keys (bool, optional) – If True and perform_copy is enabled, removes the source keys after copying (except those with “pages” in the key). Defaults to False.

  • metadata_directive (str, optional) – Directive passed to s3.copy_object, usually “COPY” or “REPLACE”. Defaults to “COPY”.

Raises:

AttributeError – If alias or provider cannot be inferred from a file path.

impresso_essentials.io.s3_add_provider.construct_dest_key(src_key: str, provider: str, og_partition: str, current_alias: str | None = None, found_prov: str | None = None) str

Construct a destination S3 key by inserting the provider into the path structure.

Modify the original source key by adding the provider name at the correct position, depending on whether a partition is defined. If a found_prov is given, the key is returned unchanged, assuming the provider is already present.

Parameters:
  • src_key (str) – The original source key (e.g., a file path in S3).

  • provider (str) – The name of the provider to inject into the key path.

  • og_partition (str) – The original partition prefix.

  • current_alias (str | None, optional) – The media alias currently being processed. Used only for logging/debugging. Defaults to None.

  • found_prov (str | None, optional) – If provided, assumes the provider is already in the path and skips modifying the key. Defaults to None.

Returns:

The modified destination key with the provider inserted, or the original key if found_prov is set.

Return type:

str

impresso_essentials.io.s3_add_provider.get_alias_from_path(source_path: str, og_partition: str) tuple[str, str | None]

Extract the media alias (and optionally the provider) from a given S3 source path.

Based on the original partition prefix, determine the position of the media alias and whether the provider is included in the path.

The logic distinguishes between: - paths where the alias is directly in the root (e.g., alias/…) - paths that include a provider level (e.g., provider/alias/…)

Parameters:
  • source_path (str) – Full S3-like path to a file or folder.

  • og_partition (str) – The partition/prefix that was originally used (e.g., “stage/”).

Returns:

A tuple of:
  • The media alias string.

  • The provider name if found, otherwise None.

Return type:

tuple[str, str | None]

Raises:

AttributeError – If the alias cannot be identified from the path.

impresso_essentials.io.s3_add_provider.main()