Input/Output
I/O from and to S3
Reusable functions to read/write data from/to our S3 drive.
- impresso_essentials.io.s3.IssueDir
alias of
IssueDirectory
- impresso_essentials.io.s3.alternative_read_text(s3_key: str, s3_credentials: dict, line_by_line: bool = True) list[str] | str
Read from S3 a line-separated text file (e.g. *.jsonl.bz2).
- Note:
The reason for this function is a bug in dask.bag.read_text() which breaks on buckets having >= 1000 keys. It raises a FileNotFoundError.
- Parameters:
s3_key (str) – Full S3 path to the file to read.
s3_credentials (dict) – S3 credentials, IMPRESSO_STORAGEOPT.
line_by_line (bool, optional) – Whether to read the file line by line. Defaults to True.
- Returns:
Contents of the file, as a list of strings or as one string.
- Return type:
list[str] | str
- impresso_essentials.io.s3.fetch_files(bucket_name: str, compute: bool = True, file_type: str = 'issues', newspapers_filter: list[str] | None = None) tuple[Bag | None, Bag | None] | tuple[list[str] | None, list[str] | None]
Fetch issue and/or page canonical JSON files from an s3 bucket.
If compute=True, the output will be a list of the contents of all files in the bucket for the specified newspapers and type of files. If compute=False, the output will remain in a distributed dask.bag.
Based on file_type, the issue files, page files or both will be returned. In the returned tuple, issues are always in the first element and pages in the second, hence if file_type is not ‘both’, the tuple entry corresponding to the undesired type of files will be None.
- Parameters:
bucket_name (str) – Name of the s3 bucket to fetch the files form
compute (bool, optional) – Whether to compute result and output as list. Defaults to True.
file_type – (str, optional): Type of files to list, possible values are “issues”, “pages” and “both”. Defaults to “issues”.
newspapers_filter – (list[str]|None,optional): List of newspapers to consider. If None, all will be considered. Defaults to None.
- Raises:
NotImplementedError – The given file_type is not one of [‘issues’, ‘pages’, ‘both’].
- Returns:
[0] Issue files’ contents or None and [1] Page files’ contents or None based on file_type
- Return type:
tuple[db.core.Bag|None, db.core.Bag|None] | tuple[list[str]|None, list[str]|None]
- impresso_essentials.io.s3.fixed_s3fs_glob(path: str, suffix: str | None = None, boto3_bucket=None) list[str]
Custom glob function able to list more than 1000 elements on s3 (fix of s3fs).
Note
path should be of the form “[partition]*[suffix or file extensions]”, with the partition potentially including the bucket name. If all files within the partitions should be considered, regardeless of their extension, “*” can be omitted. Conversely, path can be of the form “[partition]” if suffix is defined.
- Parameters:
path (str) – Glob path to the files, optionally including the bucket name. If the bucket name is not included, boto3_bucket should be defined.
suffix (str | None, optional) – Suffix or extension of the paths to consider within the bucket. Only used if “*” not found in path. Defaults to None.
boto3_bucket (boto3.resources.factory.s3.Bucket, optional) – S3 bucket to look into. Defaults to None.
- Returns:
List of filenames within the bucket corresponding to the provided path.
- Return type:
list[str]
- impresso_essentials.io.s3.get_bucket(bucket_name: str)
Create a boto3 connection and return the desired bucket.
Note
This function does not ensure that the bucket exists. If this verification is necessary, please prefer using get_or_create_bucket() instead.
- Parameters:
bucket_name (str) – Name of the S3 bucket to use.
- Returns:
Desired S3 bucket.
- Return type:
boto3.resources.factory.s3.Bucket
- impresso_essentials.io.s3.get_or_create_bucket(name: str, create: bool = False)
Create a boto3 s3 connection and create or return the requested bucket.
It is possible to ask for creating a new bucket with the specified name (in case it does not exist): >>> b = get_bucket(‘testb’, create=False) >>> b = get_bucket(‘testb’, create=True)
- Parameters:
name (str) – Name of thebucket to get of create.
create (bool, optional) – Whether to create the bucket if it doesn’t exist. Defaults to False.
- Returns:
S3 bucket, fetched or created.
- Return type:
boto3.resources.factory.s3.Bucket
- impresso_essentials.io.s3.get_s3_client(host_url: str | None = 'https://os.zhdk.cloud.switch.ch/') BaseClient
Create S3 boto3 client using environment variables from local .env files.
Assumes that two environment variables are set: SE_ACCESS_KEY and SE_SECRET_KEY.
- Parameters:
host_url (str | None, optional) – _description_. Defaults to “https://os.zhdk.cloud.switch.ch/”.
- Raises:
e – Argument host_url was not provided and SE_HOST_URL was not in the env.
e – SE_ACCESS_KEY or SE_SECRET_KEY was not in the environment variables.
- Returns:
The S3 boto3 client.
- Return type:
BaseClient
- impresso_essentials.io.s3.get_s3_object_size(bucket_name: str, key: str) int
Get the size of an object (key) in an S3 bucket.
- Parameters:
bucket_name (str) – The name of the S3 bucket.
key (str) – The key (object) whose size you want to retrieve.
- Returns:
The size of the object in bytes, or None if the object doesn’t exist.
- Return type:
int
- impresso_essentials.io.s3.get_s3_resource(host_url: str | None = 'https://os.zhdk.cloud.switch.ch/') ServiceResource
Get a boto3 resource object related to an S3 drive.
Assumes that two environment variables are set: SE_ACCESS_KEY and SE_SECRET_KEY.
- Parameters:
host_url (str | None, optional) – _description_. Defaults to “https://os.zhdk.cloud.switch.ch/”.
- Raises:
e – Argument host_url was not provided and SE_HOST_URL was not in the env.
e – SE_ACCESS_KEY or SE_SECRET_KEY was not in the environment variables.
- Returns:
S3 resource associated to the endpoint.
- Return type:
ServiceResource
- impresso_essentials.io.s3.get_storage_options() dict[str, dict | str]
Load environment variables from local .env files
Assumes that two environment variables are set: SE_ACCESS_KEY and SE_SECRET_KEY.
- Returns:
Credentials to access a S3 endpoint.
- Return type:
dict[str, dict | str]
- impresso_essentials.io.s3.list_files(bucket_name: str, file_type: str = 'issues', newspapers_filter: list[str] | None = None) tuple[list[str] | None, list[str] | None]
List the canonical files located in a given S3 bucket.
- Parameters:
bucket_name (str) – S3 bucket name.
file_type (str, optional) – Type of files to list, possible values are “issues”, “pages” and “both”. Defaults to “issues”.
newspapers_filter (list[str] | None, optional) – List of newspapers to consider. If None, all will be considered. Defaults to None.
- Raises:
NotImplementedError – The given file_type is not one of [‘issues’, ‘pages’, ‘both’].
- Returns:
- [0] List of issue files or None and
[1] List of page files or None based on file_type
- Return type:
tuple[list[str] | None, list[str] | None]
- impresso_essentials.io.s3.list_newspapers(bucket_name: str, s3_client=<botocore.client.S3 object>, page_size: int = 10000) list[str]
List newspapers contained in an s3 bucket with impresso data.
Note
25,000 seems to be the maximum PageSize value supported by SwitchEngines’ S3 implementation (ceph).
Note
Copied from https://github.com/impresso/impresso-data-sanitycheck/tree/master/sanity_check/contents/s3_data.py
- Parameters:
bucket_name (str) – Name of the S3 bucket to consider
s3_client (optional) – S3 client to use. Defaults to get_s3_client().
page_size (int, optional) – Pagination configuration. Defaults to 10000.
- Returns:
List of newspaper (aliases) present in the given S3 bucket.
- Return type:
list[str]
- impresso_essentials.io.s3.list_s3_directories(bucket_name: str, prefix: str = '') list[str]
Retrieve ‘directory’ names (media titles) in an S3 bucket given a path prefix.
- Parameters:
bucket_name (str) – The name of the S3 bucket.
prefix (str) – The prefix path within the bucket to search. Default is the root (‘’).
- Returns:
- A list of ‘directory’ names found in the specified bucket
and prefix.
- Return type:
list
- impresso_essentials.io.s3.read_jsonlines(key_name: str, bucket_name: str) Generator
Given the S3 key of a jsonl.bz2 archive, extract and return its lines.
Usage example: >>> lines = db.from_sequence(read_jsonlines(s3r, key_name , bucket_name)) >>> lines.map(json.loads).pluck(‘id’).take(10)
- Parameters:
key_name (str) – S3 key, without S3 prefix, but with partitions within.
bucket_name (str) – Name of S3 bucket to use.
- Raises:
ValueError – The provided key_name does not exist in the provided bucket.
- Yields:
Generator – generator yielding lines within the archive one by one.
- impresso_essentials.io.s3.read_s3_issues(newspaper: str, year: str, input_bucket: str) list[tuple[IssueDirectory, dict]]
Read the contents of canonical issues from a given S3 bucket.
- Parameters:
newspaper (str) – Name of the newspaper to read the issues from.
year (str) – Target year to tread issues from.
input_bucket (str) – Bucket from where to fetch the issues.
- Returns:
List of IssueDirs and the issues’ contents.
- Return type:
list[tuple[IssueDir, dict]]
- impresso_essentials.io.s3.readtext_jsonlines(key_name: str, bucket_name: str, fields_to_keep: list[str] | None = None) Generator
Given the S3 key of a jsonl.bz2 archive, return its lines textual information.
Only the provided fields (or default ones) will be kept in the returned lines. By default, fields_to_keep = [“id”, “pp”, “ts”, “lg”, “tp”, “t”, “ft”].
This can serve as the starting point for pure textual processing. Usage example: >>> lines = db.from_sequence(readtext_jsonlines(s3r, key_name , bucket_name)) >>> lines.map(json.loads).pluck(‘ft’).take(10)
- Parameters:
key_name (str) – S3 key, without S3 prefix, but with partitions within.
bucket_name (str) – Name of S3 bucket to use.
- Raises:
ValueError – The provided key_name does not exist in the provided bucket.
- Yields:
Generator – generator yielding reformated lines within the archive one by one.
- impresso_essentials.io.s3.s3_glob_with_size(path: str, boto3_bucket=None)
Custom glob function to list S3 objects matching a pattern. This function works around the 1000-object listing limit in S3 by using boto3 directly.
- Parameters:
path (str) – The S3 path with a wildcard (*) to match files. Example: s3://bucket_name/path/to/files/*.txt.
boto3_bucket (boto3.Bucket, optional) – An optional boto3 Bucket object. If not provided, it will be created from the path.
- Returns:
- A list of tuples containing the full S3 paths of matching files
and their sizes in megabytes.
- Return type:
list
- impresso_essentials.io.s3.s3_iter_bucket(bucket_name: str, prefix: str = '', suffix: str = '', accept_key: ~typing.Callable[[str], bool] = <function <lambda>>) list
Iterate over a bucket, returning all keys with some filtering options.
>>> k = s3_iter_bucket("myBucket", prefix='GDL', suffix=".bz2") >>> k = s3_iter_bucket("myBucket", prefix='GDL', accept_key=lambda x: "page" in x)
Note
If suffix is not “”, the used accepting condition will become: lambda key: accept_key(key) and key.endswith(suffix)
- Parameters:
bucket_name (str) – Name of the S3 bucket to list the contents of
prefix (str, optional) – Partition prefix to filter bucket’s keys. Defaults to “”.
suffix (str, optional) – Suffix to filter the bucket’s keys. Defaults to “”.
accept_key (Callable[[str], bool], optional) – Filtering condition for the keys as a lambda function. Defaults to lambda k: True.
- Returns:
List of keys corresponding ot the provided prefix, suffix and accept key.
- Return type:
list
- impresso_essentials.io.s3.upload_to_s3(local_path: str, path_within_bucket: str, bucket_name: str) bool
Upload a file to an S3 bucket.
- Parameters:
local_path (str) – The local file path to upload.
path_within_bucket (str) – The path within the bucket where the file will be uploaded.
bucket_name (str) – The name of the S3 bucket (without any partitions).
- Returns:
True if the upload is successful, False otherwise.
- Return type:
bool
I/O from and to file system
Code for parsing impresso’s canonical directory structures.
- impresso_essentials.io.fs_utils.canonical_path(issuedir: IssueDir, suffix: str = None, extension: str = None, as_dir: bool = False) str
Create a canonical dir, filename or ID from an IssueDir object.
- Parameters:
issuedir (IssueDir) – IssueDir object to create the path for
suffix (str, optional) – Suffix to use which will follow the issue ID. eg. Can be ‘pages’, ‘i0001’ or “p” + str(num).zfill(4). Defaults to None.
extension (str, optional) – File extension to use if creating a filename. Defaults to None.
as_dir (bool, optional) – Whether the result is a directory (‘/’ separator) or a filename or ID (‘-’ separator). Defaults to False.
- Returns:
Constructed canonical ID, filename or canonical path for given IssueDir.
- Return type:
str
- impresso_essentials.io.fs_utils.check_filenaming(file_basename: str, object_type: str = 'issue') Match[str] | None
Check whether a file’s basename complies with the naming convention.
- Parameters:
file_basename (str) – Basename of file to check (excluding extension).
object_type (str, optional) – Type of objects in the given file. One of “issue”, “page”, “rebuilt”. Defaults to ‘issues’.
- Returns:
The resulting match if correct, None otherwise.
- Return type:
Match[str] | None
- impresso_essentials.io.fs_utils.check_id(canonical_id: str, object_type: str = 'issue') Match[str] | None
Check whether a canonical ID complies with the naming convention.
- Parameters:
canonical_id (str) – Canonical ID to check.
object_type (str, optional) – Object it corresponds to. One of “issue”, “page”, “content-item”. Defaults to ‘issues’.
- Returns:
The resulting match if correct, None otherwise
- Return type:
Match[str] | None
- impresso_essentials.io.fs_utils.get_issueshortpath(issuedir: IssueDir) str
Return short version of an IssueDir’s path, starting from the journal.
- Parameters:
issuedir (IssueDir) – IssueDir instance from which to get the short path.
- Returns:
Canonical path to the issue starting at the journal name.
- Return type:
str
- impresso_essentials.io.fs_utils.glob_with_size(directory: str, file_suffix: str) list[str]
List all files in a directory with a given suffix and their size in MB.
- Parameters:
directory (str) – The directory path to search for files.
file_suffix (str) – The file extension or suffix to match.
- Returns:
- A list of tuples, each containing the file path and its
size in megabytes, rounded to six decimal places.
- Return type:
list[str]
- impresso_essentials.io.fs_utils.list_local_directories(path: str) list[str]
List the directories present at a local path.
- Parameters:
path (str) – Local path from which to list the directories.
- Returns:
List of subdirectories in path.
- Return type:
list[str]
- impresso_essentials.io.fs_utils.parse_canonical_filename(filename: str) tuple[str, tuple, str, str, int, str]
Parse a canonical page or CI ID or filename into its components.
>>> filename = "GDL-1950-01-02-a-i0002" >>> parse_canonical_filename(filename) >>> ('GDL', ('1950', '01', '02'), 'a', 'i', 2, '')
- Parameters:
filename (str) – ID or filename to parse.
- Returns:
Parsed ID or filename.
- Return type:
tuple[str, tuple, str, str, int, str]
- impresso_essentials.io.fs_utils.parse_json(filename: str) dict[str, Any]
Load the contents of a JSON file.
- Parameters:
filename (str) – Path to the json file.
- Returns:
Resulting json, contained inside the file
- Return type:
dict[str, Any]
Deleting keys from S3
Simple CLI script to delete keys from S3.
- Usage:
impresso_commons/utils/s3_delete.py –bucket=<b> –prefix=<p>
- Options:
- --bucket=<b>
Target S3 bucket
- --prefix=<p>
Prefix of keys to delete
- impresso_essentials.io.s3_delete.delete_versioned_keys(client: BaseClient, bucket: str, prefix: str, max_keys: int = 1000)
Delete all the keys within a bucket based on a given prefix.
- Parameters:
client (BaseClient) – S3 client.
bucket (str) – Name of the bucket to delete keys from.
prefix (str) – Prefix to the partition from which to delete keys.
max_keys (int, optional) – Max number of keys to delete at once. Defaults to 1000.
- impresso_essentials.io.s3_delete.main()