Input/Output

I/O from and to S3

Reusable functions to read/write data from/to our S3 drive.

impresso_essentials.io.s3.IssueDir

alias of IssueDirectory

impresso_essentials.io.s3.alternative_read_text(s3_key: str, s3_credentials: dict, line_by_line: bool = True) list[str] | str

Read from S3 a line-separated text file (e.g. *.jsonl.bz2).

Note:

The reason for this function is a bug in dask.bag.read_text() which breaks on buckets having >= 1000 keys. It raises a FileNotFoundError.

Parameters:
  • s3_key (str) – Full S3 path to the file to read.

  • s3_credentials (dict) – S3 credentials, IMPRESSO_STORAGEOPT.

  • line_by_line (bool, optional) – Whether to read the file line by line. Defaults to True.

Returns:

Contents of the file, as a list of strings or as one string.

Return type:

list[str] | str

impresso_essentials.io.s3.fetch_files(bucket_name: str, compute: bool = True, file_type: str = 'issues', newspapers_filter: list[str] | None = None) tuple[Bag | None, Bag | None] | tuple[list[str] | None, list[str] | None]

Fetch issue and/or page canonical JSON files from an s3 bucket.

If compute=True, the output will be a list of the contents of all files in the bucket for the specified newspapers and type of files. If compute=False, the output will remain in a distributed dask.bag.

Based on file_type, the issue files, page files or both will be returned. In the returned tuple, issues are always in the first element and pages in the second, hence if file_type is not ‘both’, the tuple entry corresponding to the undesired type of files will be None.

Parameters:
  • bucket_name (str) – Name of the s3 bucket to fetch the files form

  • compute (bool, optional) – Whether to compute result and output as list. Defaults to True.

  • file_type – (str, optional): Type of files to list, possible values are “issues”, “pages” and “both”. Defaults to “issues”.

  • newspapers_filter – (list[str]|None,optional): List of newspapers to consider. If None, all will be considered. Defaults to None.

Raises:

NotImplementedError – The given file_type is not one of [‘issues’, ‘pages’, ‘both’].

Returns:

[0] Issue files’ contents or None and [1] Page files’ contents or None based on file_type

Return type:

tuple[db.core.Bag|None, db.core.Bag|None] | tuple[list[str]|None, list[str]|None]

impresso_essentials.io.s3.fixed_s3fs_glob(path: str, suffix: str | None = None, boto3_bucket=None) list[str]

Custom glob function able to list more than 1000 elements on s3 (fix of s3fs).

Note

path should be of the form “[partition]*[suffix or file extensions]”, with the partition potentially including the bucket name. If all files within the partitions should be considered, regardeless of their extension, “*” can be omitted. Conversely, path can be of the form “[partition]” if suffix is defined.

Parameters:
  • path (str) – Glob path to the files, optionally including the bucket name. If the bucket name is not included, boto3_bucket should be defined.

  • suffix (str | None, optional) – Suffix or extension of the paths to consider within the bucket. Only used if “*” not found in path. Defaults to None.

  • boto3_bucket (boto3.resources.factory.s3.Bucket, optional) – S3 bucket to look into. Defaults to None.

Returns:

List of filenames within the bucket corresponding to the provided path.

Return type:

list[str]

impresso_essentials.io.s3.get_bucket(bucket_name: str)

Create a boto3 connection and return the desired bucket.

Note

This function does not ensure that the bucket exists. If this verification is necessary, please prefer using get_or_create_bucket() instead.

Parameters:

bucket_name (str) – Name of the S3 bucket to use.

Returns:

Desired S3 bucket.

Return type:

boto3.resources.factory.s3.Bucket

impresso_essentials.io.s3.get_or_create_bucket(name: str, create: bool = False)

Create a boto3 s3 connection and create or return the requested bucket.

It is possible to ask for creating a new bucket with the specified name (in case it does not exist): >>> b = get_bucket(‘testb’, create=False) >>> b = get_bucket(‘testb’, create=True)

Parameters:
  • name (str) – Name of thebucket to get of create.

  • create (bool, optional) – Whether to create the bucket if it doesn’t exist. Defaults to False.

Returns:

S3 bucket, fetched or created.

Return type:

boto3.resources.factory.s3.Bucket

impresso_essentials.io.s3.get_s3_client(host_url: str | None = 'https://os.zhdk.cloud.switch.ch/') BaseClient

Create S3 boto3 client using environment variables from local .env files.

Assumes that two environment variables are set: SE_ACCESS_KEY and SE_SECRET_KEY.

Parameters:

host_url (str | None, optional) – _description_. Defaults to “https://os.zhdk.cloud.switch.ch/”.

Raises:
  • e – Argument host_url was not provided and SE_HOST_URL was not in the env.

  • eSE_ACCESS_KEY or SE_SECRET_KEY was not in the environment variables.

Returns:

The S3 boto3 client.

Return type:

BaseClient

impresso_essentials.io.s3.get_s3_object_size(bucket_name: str, key: str) int

Get the size of an object (key) in an S3 bucket.

Parameters:
  • bucket_name (str) – The name of the S3 bucket.

  • key (str) – The key (object) whose size you want to retrieve.

Returns:

The size of the object in bytes, or None if the object doesn’t exist.

Return type:

int

impresso_essentials.io.s3.get_s3_resource(host_url: str | None = 'https://os.zhdk.cloud.switch.ch/') ServiceResource

Get a boto3 resource object related to an S3 drive.

Assumes that two environment variables are set: SE_ACCESS_KEY and SE_SECRET_KEY.

Parameters:

host_url (str | None, optional) – _description_. Defaults to “https://os.zhdk.cloud.switch.ch/”.

Raises:
  • e – Argument host_url was not provided and SE_HOST_URL was not in the env.

  • eSE_ACCESS_KEY or SE_SECRET_KEY was not in the environment variables.

Returns:

S3 resource associated to the endpoint.

Return type:

ServiceResource

impresso_essentials.io.s3.get_storage_options() dict[str, dict | str]

Load environment variables from local .env files

Assumes that two environment variables are set: SE_ACCESS_KEY and SE_SECRET_KEY.

Returns:

Credentials to access a S3 endpoint.

Return type:

dict[str, dict | str]

impresso_essentials.io.s3.list_files(bucket_name: str, file_type: str = 'issues', newspapers_filter: list[str] | None = None) tuple[list[str] | None, list[str] | None]

List the canonical files located in a given S3 bucket.

Parameters:
  • bucket_name (str) – S3 bucket name.

  • file_type (str, optional) – Type of files to list, possible values are “issues”, “pages” and “both”. Defaults to “issues”.

  • newspapers_filter (list[str] | None, optional) – List of newspapers to consider. If None, all will be considered. Defaults to None.

Raises:

NotImplementedError – The given file_type is not one of [‘issues’, ‘pages’, ‘both’].

Returns:

[0] List of issue files or None and

[1] List of page files or None based on file_type

Return type:

tuple[list[str] | None, list[str] | None]

impresso_essentials.io.s3.list_newspapers(bucket_name: str, s3_client=<botocore.client.S3 object>, page_size: int = 10000) list[str]

List newspapers contained in an s3 bucket with impresso data.

Note

25,000 seems to be the maximum PageSize value supported by SwitchEngines’ S3 implementation (ceph).

Parameters:
  • bucket_name (str) – Name of the S3 bucket to consider

  • s3_client (optional) – S3 client to use. Defaults to get_s3_client().

  • page_size (int, optional) – Pagination configuration. Defaults to 10000.

Returns:

List of newspaper (aliases) present in the given S3 bucket.

Return type:

list[str]

impresso_essentials.io.s3.list_s3_directories(bucket_name: str, prefix: str = '') list[str]

Retrieve ‘directory’ names (media titles) in an S3 bucket given a path prefix.

Parameters:
  • bucket_name (str) – The name of the S3 bucket.

  • prefix (str) – The prefix path within the bucket to search. Default is the root (‘’).

Returns:

A list of ‘directory’ names found in the specified bucket

and prefix.

Return type:

list

impresso_essentials.io.s3.read_jsonlines(key_name: str, bucket_name: str) Generator

Given the S3 key of a jsonl.bz2 archive, extract and return its lines.

Usage example: >>> lines = db.from_sequence(read_jsonlines(s3r, key_name , bucket_name)) >>> lines.map(json.loads).pluck(‘id’).take(10)

Parameters:
  • key_name (str) – S3 key, without S3 prefix, but with partitions within.

  • bucket_name (str) – Name of S3 bucket to use.

Raises:

ValueError – The provided key_name does not exist in the provided bucket.

Yields:

Generator – generator yielding lines within the archive one by one.

impresso_essentials.io.s3.read_s3_issues(newspaper: str, year: str, input_bucket: str) list[tuple[IssueDirectory, dict]]

Read the contents of canonical issues from a given S3 bucket.

Parameters:
  • newspaper (str) – Name of the newspaper to read the issues from.

  • year (str) – Target year to tread issues from.

  • input_bucket (str) – Bucket from where to fetch the issues.

Returns:

List of IssueDirs and the issues’ contents.

Return type:

list[tuple[IssueDir, dict]]

impresso_essentials.io.s3.readtext_jsonlines(key_name: str, bucket_name: str, fields_to_keep: list[str] | None = None) Generator

Given the S3 key of a jsonl.bz2 archive, return its lines textual information.

Only the provided fields (or default ones) will be kept in the returned lines. By default, fields_to_keep = [“id”, “pp”, “ts”, “lg”, “tp”, “t”, “ft”].

This can serve as the starting point for pure textual processing. Usage example: >>> lines = db.from_sequence(readtext_jsonlines(s3r, key_name , bucket_name)) >>> lines.map(json.loads).pluck(‘ft’).take(10)

Parameters:
  • key_name (str) – S3 key, without S3 prefix, but with partitions within.

  • bucket_name (str) – Name of S3 bucket to use.

Raises:

ValueError – The provided key_name does not exist in the provided bucket.

Yields:

Generator – generator yielding reformated lines within the archive one by one.

impresso_essentials.io.s3.s3_glob_with_size(path: str, boto3_bucket=None)

Custom glob function to list S3 objects matching a pattern. This function works around the 1000-object listing limit in S3 by using boto3 directly.

Parameters:
  • path (str) – The S3 path with a wildcard (*) to match files. Example: s3://bucket_name/path/to/files/*.txt.

  • boto3_bucket (boto3.Bucket, optional) – An optional boto3 Bucket object. If not provided, it will be created from the path.

Returns:

A list of tuples containing the full S3 paths of matching files

and their sizes in megabytes.

Return type:

list

impresso_essentials.io.s3.s3_iter_bucket(bucket_name: str, prefix: str = '', suffix: str = '', accept_key: ~typing.Callable[[str], bool] = <function <lambda>>) list

Iterate over a bucket, returning all keys with some filtering options.

>>> k = s3_iter_bucket("myBucket", prefix='GDL', suffix=".bz2")
>>> k = s3_iter_bucket("myBucket", prefix='GDL', accept_key=lambda x: "page" in x)

Note

If suffix is not “”, the used accepting condition will become: lambda key: accept_key(key) and key.endswith(suffix)

Parameters:
  • bucket_name (str) – Name of the S3 bucket to list the contents of

  • prefix (str, optional) – Partition prefix to filter bucket’s keys. Defaults to “”.

  • suffix (str, optional) – Suffix to filter the bucket’s keys. Defaults to “”.

  • accept_key (Callable[[str], bool], optional) – Filtering condition for the keys as a lambda function. Defaults to lambda k: True.

Returns:

List of keys corresponding ot the provided prefix, suffix and accept key.

Return type:

list

impresso_essentials.io.s3.upload_to_s3(local_path: str, path_within_bucket: str, bucket_name: str) bool

Upload a file to an S3 bucket.

Parameters:
  • local_path (str) – The local file path to upload.

  • path_within_bucket (str) – The path within the bucket where the file will be uploaded.

  • bucket_name (str) – The name of the S3 bucket (without any partitions).

Returns:

True if the upload is successful, False otherwise.

Return type:

bool

I/O from and to file system

Code for parsing impresso’s canonical directory structures.

impresso_essentials.io.fs_utils.canonical_path(issuedir: IssueDir, suffix: str = None, extension: str = None, as_dir: bool = False) str

Create a canonical dir, filename or ID from an IssueDir object.

Parameters:
  • issuedir (IssueDir) – IssueDir object to create the path for

  • suffix (str, optional) – Suffix to use which will follow the issue ID. eg. Can be ‘pages’, ‘i0001’ or “p” + str(num).zfill(4). Defaults to None.

  • extension (str, optional) – File extension to use if creating a filename. Defaults to None.

  • as_dir (bool, optional) – Whether the result is a directory (‘/’ separator) or a filename or ID (‘-’ separator). Defaults to False.

Returns:

Constructed canonical ID, filename or canonical path for given IssueDir.

Return type:

str

impresso_essentials.io.fs_utils.check_filenaming(file_basename: str, object_type: str = 'issue') Match[str] | None

Check whether a file’s basename complies with the naming convention.

Parameters:
  • file_basename (str) – Basename of file to check (excluding extension).

  • object_type (str, optional) – Type of objects in the given file. One of “issue”, “page”, “rebuilt”. Defaults to ‘issues’.

Returns:

The resulting match if correct, None otherwise.

Return type:

Match[str] | None

impresso_essentials.io.fs_utils.check_id(canonical_id: str, object_type: str = 'issue') Match[str] | None

Check whether a canonical ID complies with the naming convention.

Parameters:
  • canonical_id (str) – Canonical ID to check.

  • object_type (str, optional) – Object it corresponds to. One of “issue”, “page”, “content-item”. Defaults to ‘issues’.

Returns:

The resulting match if correct, None otherwise

Return type:

Match[str] | None

impresso_essentials.io.fs_utils.get_issueshortpath(issuedir: IssueDir) str

Return short version of an IssueDir’s path, starting from the journal.

Parameters:

issuedir (IssueDir) – IssueDir instance from which to get the short path.

Returns:

Canonical path to the issue starting at the journal name.

Return type:

str

impresso_essentials.io.fs_utils.glob_with_size(directory: str, file_suffix: str) list[str]

List all files in a directory with a given suffix and their size in MB.

Parameters:
  • directory (str) – The directory path to search for files.

  • file_suffix (str) – The file extension or suffix to match.

Returns:

A list of tuples, each containing the file path and its

size in megabytes, rounded to six decimal places.

Return type:

list[str]

impresso_essentials.io.fs_utils.list_local_directories(path: str) list[str]

List the directories present at a local path.

Parameters:

path (str) – Local path from which to list the directories.

Returns:

List of subdirectories in path.

Return type:

list[str]

impresso_essentials.io.fs_utils.parse_canonical_filename(filename: str) tuple[str, tuple, str, str, int, str]

Parse a canonical page or CI ID or filename into its components.

>>> filename = "GDL-1950-01-02-a-i0002"
>>> parse_canonical_filename(filename)
>>> ('GDL', ('1950', '01', '02'), 'a', 'i', 2, '')
Parameters:

filename (str) – ID or filename to parse.

Returns:

Parsed ID or filename.

Return type:

tuple[str, tuple, str, str, int, str]

impresso_essentials.io.fs_utils.parse_json(filename: str) dict[str, Any]

Load the contents of a JSON file.

Parameters:

filename (str) – Path to the json file.

Returns:

Resulting json, contained inside the file

Return type:

dict[str, Any]

Deleting keys from S3

Simple CLI script to delete keys from S3.

Usage:

impresso_commons/utils/s3_delete.py –bucket=<b> –prefix=<p>

Options:
--bucket=<b>

Target S3 bucket

--prefix=<p>

Prefix of keys to delete

impresso_essentials.io.s3_delete.delete_versioned_keys(client: BaseClient, bucket: str, prefix: str, max_keys: int = 1000)

Delete all the keys within a bucket based on a given prefix.

Parameters:
  • client (BaseClient) – S3 client.

  • bucket (str) – Name of the bucket to delete keys from.

  • prefix (str) – Prefix to the partition from which to delete keys.

  • max_keys (int, optional) – Max number of keys to delete at once. Defaults to 1000.

impresso_essentials.io.s3_delete.main()