Data Versioning
The versioning package of impresso_essentials contains several modules and scripts with classes and functions that allow to version Impresso’s data at various stages of the processing pipeline.
The main goal of this approach is to version the data and track information at every stage to:
Ensure data consisteny and ease of debugging: Data elements should be consistent across stages, and inconsistencies/differences should be justifiable through the identification of data leakage points.
Allow partial updates: It should be possible to (re)run all or part of the processes on subsets of the data, knowing which version of the data was used at each step. This can be necessary when new media collections arrive, or when an existing collection has been patched.
Ensure transparency: Citation of the various data stages and datasets should be straightforward; users should know when using the interface exactly what versions they are using, and should be able to consult the precise statistics related to them.
Data Statistics and Newspaper Statistics
This module contains the definition of a data statistics class.
A DataStatstics object should be instantiated during each processing step of the data preprocessing and augmentation of the Impresso project, and used to progressively count the number of elements modified or added by the processing.
- class impresso_essentials.versioning.data_statistics.DataStatistics(data_stage: DataStage | str, granularity: str, element: str | None = None, source_medium: SourceMedium | str | None = None, provider: str | None = None, counts: dict[str, int | dict[str, int]] | None = None)
Bases:
ABCCount statistics computed on a specific portion and granularity of the data.
- Parameters:
data_stage (DataStage | str) – The stage of data the stats are computed on.
granularity (str) – The granularity of the statistics with respect to the data.
element (str, optional) – The specific element associated with the statistics. Defaults to “” (empty string).
counts (dict[str, int | dict[str, int]] | None, optional) – Initial counts for statistics. Defaults to None.
- granularity
The granularity of the statistics with respect to the data.
- Type:
str
- element
The specific element associated with the statistics.
- Type:
str
- count_keys
The count keys for these statistics.
- Type:
list[str]
- counts
The count statistics computed on the specific data, can include frequency dicts.
- Type:
dict[str, int | dict[str, int]]
- add_counts(new_counts: dict[str, int | dict[str, int]], replace: bool = False) bool
Add new counts to the existing counts if the new keys are validated.
- Parameters:
new_counts (dict[str, int | dict[str, int]]) – New counts to be added.
- Returns:
True if the counts were valid and could be added, False otherwise.
- Return type:
bool
- init_counts() dict[str, int | dict[str, int]]
Initialize a dict with all the keys associated to this object.
- Returns:
- A dict with all defined keys, and values
initialized to 0 (or to empty frequency dicts).
- Return type:
dict[str, int | dict[str, int]]
- pretty_print(modif_date: str | None = None, include_counts: bool = False) dict[str, Any]
Generate a dict representation of these statistics to add to a json.
These stats are agnostic to the type of statistics they represent so the values of self.counts are excluded by default, to be included in child classes. The modification date can also be included (when granularity=’year’)
- Parameters:
modif_date (Optional[str], optional) – Last modification date of the corresponding elements. Defaults to None.
include_counts (bool, optional) – Whether to include the current counts with key “stats”. Defaults to False.
- Returns:
A dict with the general information about these statistics.
- Return type:
dict[str, Any]
- abstractmethod same_counts(other_stats: dict[str, Any] | Self) bool
Given another dict of stats, check whether the values are the same.
- class impresso_essentials.versioning.data_statistics.MediaStatistics(data_stage: DataStage | str, granularity: str, element: str | None = None, source_medium: SourceMedium | str | None = None, provider: str | None = None, counts: dict[str, int | dict[str, int]] | None = None)
Bases:
DataStatisticsCount statistics computed on a specific portion and granularity of the data.
- Parameters:
data_stage (DataStage | str) – The stage of data the stats are computed on.
granularity (str) – The granularity of the statistics with respect to the data.
element (str, optional) – The specific element associated with the statistics. Defaults to “” (empty string).
counts (dict[str, int] | None, optional) – Initial counts for statistics. Defaults to None.
- granularity
The granularity of the statistics with respect to the data.
- Type:
str
- element
The specific element associated with the statistics.
- Type:
str
- count_keys
The count keys for these statistics.
- Type:
list[str]
- counts
The count statistics computed on the specific data.
- Type:
dict[str, int]
- possible_count_keys = ['titles', 'issues', 'pages', 'audios', 'content_items_out', 'reocred_cis', 'ft_tokens', 'images', 'ne_mentions', 'ne_entities', 'embeddings_el', 'topics', 'topics_fd', 'lang_fd', 'text_reuse_clusters', 'text_reuse_passages', 'avg_ocrqa', 'img_level0_class_fd', 'img_level1_class_fd', 'img_level2_class_fd', 'img_level3_class_fd']
- pretty_print(modif_date: str | None = None, include_counts: bool = True) dict[str, Any]
Generate a dict representation of these statistics to add to a json.
- Parameters:
modif_date (Optional[str], optional) – Last modification date of the corresponding elements. Defaults to None.
include_counts (bool, optional) – Whether to include the current media counts with key “media_stats”. Defaults to True.
- Returns:
A dict representation of these statistics.
- Return type:
dict[str, Any]
- same_counts(other_stats: dict[str, Any] | Self) bool
Given another dict of stats, check whether the values are the same.
- Parameters:
other_stats (Union[dict[str, Any], Self]) – Dict with pretty-printed media statistics or other MediaStatistics object.
- Returns:
- True if the values for the various fields of media_stats where the
same, False otherwise.
- Return type:
bool
- stage_extra_keys = {DataStage.CANONICAL: ['pages', 'audios', 'images'], DataStage.CAN_CONSOLIDATED: ['pages', 'audios', 'images', 'reocred_cis', 'lang_fd'], DataStage.CLASSIF_IMAGES: ['images', 'img_level0_class_fd', 'img_level1_class_fd', 'img_level2_class_fd', 'img_level3_class_fd'], DataStage.EMB_DOCS: [], DataStage.EMB_IMAGES: ['images'], DataStage.ENTITIES: ['ne_entities', 'ne_mentions'], DataStage.LANGIDENT_OCRQA: ['images', 'lang_fd', 'avg_ocrqa'], DataStage.LANGIDENT: ['images', 'lang_fd'], DataStage.LINGPROC: [], DataStage.MYSQL_CIS: ['pages', 'audios'], DataStage.NEWS_AGENCIES: ['ne_entities', 'ne_mentions'], DataStage.OCRQA: ['avg_ocrqa'], DataStage.PASSIM: ['ft_tokens'], DataStage.REBUILT: ['ft_tokens'], DataStage.SOLR_TEXT: ['ft_tokens'], DataStage.TEXT_REUSE: ['text_reuse_clusters', 'text_reuse_passages'], DataStage.TOPICS: ['topics', 'topics_fd']}
Data Manifest
This module contains the definition of a manifest class.
A manifest object should be instantiated for each processing step of the data preprocessing and augmentation of the Impresso project.
- class impresso_essentials.versioning.data_manifest.DataManifest(data_stage: DataStage | str, s3_output_bucket: str, git_repo: str | Repo, temp_dir: str, s3_input_bucket: str | None = None, new_version: str | None = None, is_patch: bool | None = False, patched_fields: dict[str, list[str]] | list[str] | None = None, previous_mft_path: str | None = None, only_counting: bool | None = False, notes: str | None = None, push_to_git: bool | None = None, relative_git_path: str | None = None, model_id: str | None = '', run_id: str | None = '')
Bases:
object- add_by_ci_id(ci_id: str, counts: dict[str, int], src_medium: str | None = None, provider: str | None = None) bool
Add new counts corresponding to a specific content-item ID.
- Parameters:
ci_id (str) – Content-item canonical ID to which the counts correspond.
counts (dict[str, int]) – Counts corresponding to that ID.
- Returns:
True if the processing stats’ update was successful, False otherwise.
- Return type:
bool
- add_by_title_year(title: str, year: str, counts: dict[str, int], src_medium: str | None = None, provider: str | None = None) bool
Add new counts corresponding to a specific media title and year.
- Parameters:
title (str) – Media title to which the counts correspond.
year (str) – Year to which the counts correspond.
counts (dict[str, int]) – Counts corresponding to that title and year.
- Returns:
True if the processing stats’ update was successful, False otherwise.
- Return type:
bool
- add_count_list_by_title_year(title: str, year: str, all_counts: list[dict[str, int]], src_medium: str | None = None, provider: str | None = None) bool
Add a list of new counts corresponding to a specific media title and year.
- Parameters:
title (str) – Media title to which the counts correspond.
year (str) – Year to which the counts correspond.
all_counts (list[dict[str, int]]) – Lsit of counts for that title and year.
- Returns:
True if all the updates were successful, False otherwise.
- Return type:
bool
- aggregate_stats_for_title(title: str, media_dict: dict[str, Any]) tuple[dict[str, Any], MediaStatistics]
Aggregate all stats of given title and export them to a “pretty print” dict.
The DataStatistics objects don’t display in the dict format by default, but need to be converted to dicts to show as desired on the final manifest.
- Parameters:
title (str) – Media title for which to aggregate the yearly stats.
media_dict (dict[str, Any]) – Title’s media dict with formatted statistics.
- Returns:
- Updated media dict and
corresponding title-level DataStatistics object.
- Return type:
tuple[dict[str, Any], MediaStatistics]
- append_to_notes(contents: str, to_start: bool = True) None
Append a string content to the manifest notes, initialize them if needed.
- Parameters:
contents (str) – Text to add to the manifest notes.
to_start (bool, optional) – Whether the contents should be added to the start of the notes instead of the end. Defaults to True.
- compute(export_to_git_and_s3: bool = True, commit_msg: str | None = None) None
Perform all necessary logic to compute and construct the resulting manifest.
This lazy behavior ensures all necessary information is ready and accessible when generating the manifest (in particular the _processing_stats).
- The steps of this computation are the following:
Ensure _processing_stats is not empty so the manifest can be computed and crystallize the time this function is called as the _generation_date.
Fetch the previous version of this manifest from S3, extract its media list.
Generate the new media list given the previous one and _processing_stats.
Compute the new title and corpus level statistics using the new media list.
Compute the new version based on the performed updates.
Define the manifest_data attribute corresponding to the final manifest.
Optionally, dump it to JSON, export it to S3 and Git.
- Parameters:
export_to_git_and_s3 (bool, optional) – Whether to export the final manifest_data as JSON to S3 and GitHub. Defaults to True. If False, validate_and_export_manifest can be called separately to do it.
commit_msg (Optional[str], optional) – Commit message to use instead of the default from validate_and_export_manifest. Defaults to None.
- define_update_info_for_title(processed_years: set[str], prev_version_years: set[str]) dict[str, str | list]
Define a title’s update info from the previous and newly updated years.
The update information for a given title corresponds to four keys, for which the values provide information about what modifications took place during the processing this manifest is documenting.
- They are defined based on various values:
self.patched_fields: fields updated during the processing (eg. for a patch).
processed_years and prev_version_years
- Four cases exist:
- All newly processed years were in the previous version
-> full title update, only modification.
- Part of the previous years were updated, and no newly added years:
-> year-specific update, where all modified years will be listed.
- All previous years were updated, and new years were added:
-> full title update with addition.
- Part of the previous years were updated, and new years were added:
-> year-specific update, with addition.
- Parameters:
processed_years (set[str]) – Years for which statistics were computed for this manifest.
prev_version_years (set[str]) – Years for which statistics has already been computed for the previous version of this manifest.
- Returns:
New update info dict for the given title.
- Return type:
dict[str, Union[str, list]]
- generate_media_dict(old_media_list: dict[str, dict]) tuple[dict, bool]
Given the previous manifest’s and current statistics, generate new media dict.
- The previous version media list is updated with current processing media list:
Setting new modification date & git url for each modified title.
Compute update level & targets if not the processing is not a patch.
From this update, also conclude on whether new data was added, informing the how the version should be increased: if new title-year keys exist, the “addition” flag will conduct to a major verison increase.
- Parameters:
old_media_list (dict[str, dict]) – _description_
- Returns:
_description_
- Return type:
tuple[dict, bool]
- get_count_keys() list[str]
Get the list of count keys for this manifest’s media dict.
- Returns:
Count keys corresponding to this manifest’s DataStage.
- Return type:
list[str]
- has_title_year_key(title: str, year: str) bool
Verify whether the provided title and year have been processed.
- Parameters:
title (str) – Media title to check.
year (str) – Year to check.
- Returns:
True if the title-year pair has instantiated counts, false otherwise.
- Return type:
bool
- new_media(title: str, provider: str | None = None) dict[str, Any]
Add a new media dict to the media list, given its title.
- By default, this means the update information will be the following:
“update_type”: “addition”
“update_level”: “title”
“updated_years”: [] # all represented years will be new
“updated_fields”: [] # all fields will be new
- Parameters:
title (str) – Media title for which to add a new media.
- Returns:
The new media dict to add to the media list.
- Return type:
dict[str, Any]
- property output_mft_s3_path: str
Get this manifest’s output S3 path based on its output bucket.
The manifest will be uploaded to the S3 bucket and partition corresponding to the value provided for its input argument s3_output_bucket. If the versison attribute for this manifest is not defined, the S3 output path cannot be provided and the empty string will be returned.
- Returns:
- Full S3 path of this manifest if the version is already defined,
the empty string otherwise.
- Return type:
str
- overall_stats(title_stats: list[MediaStatistics]) list[dict]
Generate the overall stats and append the ones from the input manifest.
- Parameters:
title_stats (list[MediaStatistics]) – List of all title-level statistics used to compute the overall stats.
- Returns:
This manifest’s overall stats with the ones of previous stages.
- Return type:
list[dict]
- provider_level_stats(title_stats: list[MediaStatistics]) list[dict]
Generate the provider-level stats and append the ones from the input manifest.
- Parameters:
title_stats (list[MediaStatistics]) – List of all provider-level statistics used to compute the provider-level stats.
- Returns:
This manifest’s provider-level stats with the ones of previous stages.
- Return type:
list[dict]
- replace_by_ci_id(ci_id: str, counts: dict[str, int], src_medium: str | None = None, provider: str | None = None) bool
Replace the current counts for a CI id’s title-year pair with new ones.
Warning
This operation will overwrite any current counts corresponding to the media title and year of the provided content-item ID. If the goal isn’t to overwrite these counts, add_by_ci_id is better suited.
- Parameters:
ci_id (str) – Content-item canonical ID to which the counts correspond.
counts (dict[str, int]) – Counts for that ID to overwrite current counts with.
- Returns:
True if the stats’ modification was successful, False otherwise.
- Return type:
bool
- replace_by_title_year(title: str, year: str, counts: dict[str, int], src_medium: str | None = None, provider: str | None = None) bool
Replace the current counts for a given title-year pair with new ones.
Warning
This operation will overwrite any current counts corresponding to the media title and year of the provided content-item ID. If the goal isn’t to overwrite these counts, add_by_title_year is better suited.
- Parameters:
title (str) – Media title to which the counts correspond.
year (str) – Year to which the counts correspond.
counts (dict[str, int]) – Counts for that ID to overwrite current counts with.
- Returns:
True if the stats’ modification was successful, False otherwise.
- Return type:
bool
- title_level_stats(media_list: dict[str, dict]) tuple[list[MediaStatistics], dict[str, dict]]
Compute the title-level statistics from the new media list.
Also removes the stats_as_dict field from the media list, and returns the media list with each MediaStatistics object “pretty printed”.
- Parameters:
media_list (dict[str, dict]) – Updated media list for this manifest.
- Returns:
- New title-level stats and
media list.
- Return type:
tuple[list[MediaStatistics], dict[str, dict]]
- update_media_stats(title: str, yearly_stats: dict[str, dict], old_media_list: dict[str, dict]) dict | list[str]
Update a title’s media statistics given the its newly computed yearly stats.
Note that it’s actually the old_media_list’s contents which are updated when necessary.
- In addition, the value of self.only_counting will change the behavior:
When False, the computation of the manifest should follow a processing, and all data within the _processing_stats (here yearly_stats for 1 title) will be considered to have been modified (or re-generated).
When True, the manifest is computed to verify the contents of the data, and the media’s information will be update only if differences in statisitics are found between the previous and current version.
- Parameters:
title (str) – Media title for which to update the media list.
yearly_stats (dict[str, dict]) – New yearly statistics for the title.
old_media_list (dict[str, dict]) – Previous version manifest’ media list.
- Returns:
- Previous manifest’s media list potentially updated to match
new counts, and the list of years which have been modified
- Return type:
Union[dict, list[str]]
- validate_and_export_manifest(push_to_git: bool = False, commit_msg: str | None = None) bool
Validate the current manifest against a schema and export it (s3 and Git).
This function will always upload the generated manifest to S3, using a path constructed based on self.output_bucket_name and the DataStage.
If push_to_git is True, by default the commit message used will be “Add generated manifest file {filename}.” It can be overriden.
Note
If a problem occurs when pushing to Git, a critical message will be logged, but it won’t modify or alter the upload of the manifest to S3.
- Parameters:
push_to_git (bool, optional) – Whether to also push the generated manifest to GitHub (impresso/impresso-data-release). Defaults to False.
commit_msg (Optional[str], optional) – Commit message to override the default message. Defaults to None.
- Returns:
Whether the upload to s3 was successful.
- Return type:
bool
Versioning Helpers
Helper functions to read, generate and write data versioning manifests.
- impresso_essentials.versioning.helpers.add_media_source_metadata(title: str, old_media_title_info: dict[str, dict], provider: str | None = None) dict[str, Any]
Add the source metadata to an exsiting media title info dict.
- Parameters:
title (str) – Impresso alias of the media title.
old_media_title_info (dict[str, dict]) – Existing info to update.
- Returns:
Media title info dict with additional source metadata.
- Return type:
dict[str, Any]
- impresso_essentials.versioning.helpers.extract_version(name_or_path: str, as_int: bool = False) str | int
Extract the version from a string filename or path.
This function is in particular mean to extract the version from paths or filenames of manifests: structured as [data-stage]_vM-m-p.json.
- Parameters:
name_or_path (str) – Filename or path from which to extract the version.
as_int (bool, optional) – Whether to return the extracted version as int or str. Defaults to False.
- Returns:
Extracted version, as int or str based on as_int.
- Return type:
Union[str, int]
- impresso_essentials.versioning.helpers.filter_new_or_modified_media(rebuilt_mft_json: dict[str, Any], previous_mft_json: dict[str, Any]) dict[str, Any]
Compares two manifests to determine new or modified media items.
Typical use-case is during an atomic update, when only media items added or modified compared to the previous process need to be ingested or processed.
- Parameters:
rebuilt_mft_json (dict[str, Any]) – json of the rebuilt manifest (new).
previous_mft_json (dict[str, Any]) – json of the previous process manifest.
- Returns:
A manifest identical to ‘rebuilt_mft_path’ but only with media items that are new or modified in the media list.
- Return type:
list[dict[str, Any]]
Example: >>> new_or_modified = get_new_or_modified_media(“new_manifest.json”, “previous_manifest.json”) >>> print(new_or_modified) [{‘media_title’: ‘new_media_item_1’, ‘last_modif_date’: ‘2024-04-04T12:00:00Z’, etc.}, {‘media_title’: ‘modified_media_item_2’, ‘last_modif_date’: ‘2024-04-03T12:00:00Z’, etc.}]
- impresso_essentials.versioning.helpers.find_s3_data_manifest_path(bucket_name: str, data_stage: str, partition: str | None = None) str | None
Find and return the latest data manifest in a given S3 bucket.
On S3, different Data stages will be stored in different ways. In particular, data stages corresponding to enrichments are all placed in the same bucket but in different partitions. Data stages “canonical”, “rebuilt”, “evenized-rebuilt” & ones related to Solr are the ones where each stage has its own bucket.
- Parameters:
bucket_name (str) – Name of the bucket in which to look.
data_stage (str) – Data stage corresponding to the manifest to fetch.
partition (Optional[str], optional) – Partition within the bucket to look into. Defaults to None.
- Returns:
- S3 path of the latest manifest in the bucket, None if no
manifests were found inside.
- Return type:
Optional[str]
- impresso_essentials.versioning.helpers.get_media_item_years(mnf_json: dict[str, Any]) dict[str, dict[str, float]]
Retrieves the s3 key and size in MB of each year of media items from a manifest.
- Parameters:
mnf_json (dict) – A manifest dictionary.
- Returns:
- A dictionary where media titles are keys,
and each value is a dictionary with s3 key as key and its size as value.
- Return type:
media_items_years (dict)
- impresso_essentials.versioning.helpers.get_media_titles(input_data: dict[str, Any] | list[dict[str, Any]]) list[str]
Extracts media titles from the input data which can be either a manifest or a media list.
- Parameters:
input_data (Union[dict[str, Any], list[dict[str, Any]]]) – A manifest dictionary or the media list of a manifest.
- Returns:
A list of media titles extracted from the input data. Ex: [‘Title 1’, ‘Title 2’]
- Return type:
list[str]
- Raises:
TypeError – If the input data is not in the expected format.
KeyError – If the ‘media_title’ key is not found in the input data.
- impresso_essentials.versioning.helpers.increment_version(prev_version: str, increment: str) str
Update given version accoding to the given increment.
When the increment is major or minor, all following numbers are reset to 0.
- Parameters:
prev_version (str) – Version to increment
increment (str) – Increment, can be one of major, minor and patch.
- Raises:
e – Increment value provided is not valid.
- Returns:
Vesion incremented accordingly.
- Return type:
str
- impresso_essentials.versioning.helpers.init_media_info(add: bool = True, full_title: bool = True, years: list[str] | None = None, fields: list[str] | None = None) dict[str, Any]
Initialize the media update dict for a title given relevant information.
All the update informations are relating to the newly processed data, in comparison with the one computed during the last processing.
- Parameters:
add (bool, optional) – Whether new data was added. Defaults to True.
full_title (bool, optional) – Whether all the title’s years were modified. Defaults to True.
years (Optional[list[str]], optional) – When full_title, the specific years which were modified/updated. Defaults to None.
fields (Optional[list[str]], optional) – List of specific fields that were modified/updated. Defaults to None.
- Returns:
Instantiated dict with the update information for a given media.
- Return type:
dict[str, Any]
- impresso_essentials.versioning.helpers.manifest_summary(mnf_json: dict[str, Any], extended_summary: bool = False) None
Generate a summary of the manifest data.
- Parameters:
mnf_json (dict) – A dictionary containing manifest data.
extended_summary (bool, optional) – Whether to include extended summary
False. (with year statistics. Defaults to)
- Returns:
None
Prints: Summary of the manifest including the number of media items, additions, and modifications.
Example
>>> manifest_summary(manifest_json) Summary of manifest /path/to/manifest.json: Number of media items: 10 (8 from set) Number of addition at title level: 5 Number of addition at year level: 3 Number of modification at title level: 2 Number of modification at year level: 1
- impresso_essentials.versioning.helpers.media_list_from_mft_json(json_mft: dict[str, Any]) dict[str, dict]
Extract the media_list from a manifest as a dict where each title is a key.
For each title, all fields from the original media list will still be present along with an additional stats_as_dict field containing a dict mapping each year to its specific statistics.
- As a result:
All represented titles are within the keys of the returned media list.
For each title, represented years are in the keys of its stats_as_dict field.
- Parameters:
json_mft (dict[str, Any]) – Dict following the JSON schema of a manifest from which to extract the media list.
- Returns:
Media list of given manifest, with stats_as_dict field.
- Return type:
dict[str, dict]
- impresso_essentials.versioning.helpers.read_manifest_from_s3(bucket_name: str, data_stage: DataStage | str, partition: str | None = None) tuple[str, dict[str, Any]] | None
Read and load manifest given an S3 bucket.
- Parameters:
bucket_name (str) – NAme of the s3 bucket to look into
data_stage (Union[DataStage, str]) – Data stage corresponding to the manifest to fetch.
partition (Optional[str], optional) – Partition within the bucket to look into. Defaults to None.
- Returns:
- S3 path of the manifest
and corresponding contents, if a manifest was found, None otherwise.
- Return type:
tuple[str, dict[str, Any]] | tuple[None, None]
- impresso_essentials.versioning.helpers.read_manifest_from_s3_path(manifest_s3_path: str) dict[str, Any] | None
read and extract the contents of an arbitrary manifest,
- Parameters:
manifest_s3_path (str) – S3 path of the manifest to read.
- Returns:
Contents of manifest if found on S3, None otherwise.
- Return type:
Optional[dict[str, Any]]
- impresso_essentials.versioning.helpers.remove_media_in_manifest(mnf_json: dict[str, Any], white_list: list[str]) None
Removes media items from the given manifest JSON object based on a whitelist. Typical use case is ingestion or processing only part of the media for whatever reason.
- Parameters:
mnf_json (dict[str, Any]) – The manifest JSON object containing a ‘media_list’.
white_list (list[str]) – A list of media titles to be retained in the manifest.
- Returns:
Modifies the input manifest JSON object in-place by removing media items not in the whitelist.
- Return type:
None
- impresso_essentials.versioning.helpers.sort_media_list_years_and_titles(media_list: dict[str, dict]) dict[str, dict]
Sort the media titles and corresponding years by alphabetical order.
The media_list is in a format such that: - media_list[title][“stats_as_dict”][year] = stats where title and year are strings Since it’s a dict, we can’t directly sort it. However we can populate a new dict with the exact same format where the keys are sorted. This ensures we keep always the same ordering in manifests.
- Parameters:
media_list (dict[str, dict]) – Media list to sort.
- Returns:
Same dict, where the title and subsequent year keys are sorted.
- Return type:
dict[str, dict]
- impresso_essentials.versioning.helpers.validate_version(v: str, regex: str = '^v([0-9]+[.]){2}[0-9]+$') str | None
Validate the provided string version against a regex.
The provided version should be in format “vM.m.p”, where M, m and p are integers representing respectively the Major, minor and patch version.
- Parameters:
v (str) – version in string format to validate.
regex (str, optional) – Regex against which to match the version. Defaults to “^v([0-9]+[.]){2}[0-9]+$”.
- Returns:
The provided version if it’s valid, None otherwise.
- Return type:
Optional[str]
- impresso_essentials.versioning.helpers.version_as_list(version: str) list[int]
Return the provided string version as a list of three ints.
- Parameters:
version (str) – String version to return as list
- Returns:
- list of len 3 where indices respecively correspond to the
Major, minor and patch versions.
- Return type:
list[int]
Data Aggregator Helpers
Helper functions to used to compute and aggragate the statistics of manifests.
- impresso_essentials.versioning.aggregators.agg(s)
The function which will aggregate the result from all the partitions (reduce). Part of the ggregating function(s) implementing np.nunique()
- impresso_essentials.versioning.aggregators.chunk(s)
The function applied to the individual partition (map). Part of the ggregating function(s) implementing np.nunique()
- impresso_essentials.versioning.aggregators.compute_stats_in_can_consolidated_bag(s3_can_cons_issues: Bag, client: Client | None = None, title: str | None = None, src_medium: str | None = None) list[dict[str, Any]]
Computes number of issues and supports per alias from a Dask bag of consolidated canonical data.
- Parameters:
s3_can_cons_issues (db.core.Bag) – Bag with the contents of consolidated canonical files to compute statistics on.
title (str, optional) – Media title for which the stats are being computed. Defaults to None.
src_medium (str, optional) – The source medium of this issue. Defaults to None.
- Returns:
List of counts that match canonical DataStatistics keys.
- Return type:
list[dict[str, Any]]
- impresso_essentials.versioning.aggregators.compute_stats_in_canonical_bag(s3_canonical_issues: Bag, client: Client | None = None, title: str | None = None, src_medium: str | None = None) list[dict[str, Any]]
Computes number of issues and supports per alias from a Dask bag of canonical data.
- Parameters:
s3_canonical_issues (db.core.Bag) – Bag with the contents of canonical files to compute statistics on.
title (str, optional) – Media title for which the stats are being computed. Defaults to None.
src_medium (str, optional) – The source medium of this issue. Defaults to None.
- Returns:
List of counts that match canonical DataStatistics keys.
- Return type:
list[dict[str, Any]]
- impresso_essentials.versioning.aggregators.compute_stats_in_classif_img_bag(s3_classif_images: Bag, client: Client | None = None, title: str | None = None) list[dict[str, Any]]
Compute stats on a dask bag of topic modeling output content-items.
- Parameters:
s3_classif_images (db.core.Bag) – Bag with the contents of the image classification files.
client (Client | None, optional) – Dask client. Defaults to None.
title (str, optional) – Media title for which the stats are being computed. Defaults to None.
- Returns:
List of counts that match topics DataStatistics keys.
- Return type:
list[dict[str, Any]]
- impresso_essentials.versioning.aggregators.compute_stats_in_doc_emb_bag(s3_doc_embeddings: Bag, client: Client | None = None, title: str | None = None) list[dict[str, int | str]]
Compute stats on a dask bag of document embeddings.
- Parameters:
s3_solr_ing_cis (db.core.Bag) – Bag with the contents of doc embeddings files.
client (Client | None, optional) – Dask client. Defaults to None.
title (str, optional) – Media title for which the stats are being computed. Defaults to None.
- Returns:
List of counts that match document embeddings DataStatistics keys.
- Return type:
list[dict[str, Union[int, str]]]
- impresso_essentials.versioning.aggregators.compute_stats_in_entities_bag(s3_entities: Bag, client: Client | None = None, title: str | None = None) list[dict[str, Any]]
Compute stats on a dask bag of entities output content-items.
A problem with the initial approach to explode and then sum the values of content_items_out and ne_mentions was found: the summmed counts were multiplied by the length of ne_entities: - when len(ne_entities)=0, the row was omitted altogether - when len(ne_entities)>1, there was an overcounting of the number of CIs.
The fix is in two parts: - using tunique for the number of CIs (same logic as the Issue Ids). - aggregating separately the sum of ne_mentions BEFORE the explode to prevent duplication and ensure CIs with zero entities are still counted.
- Parameters:
s3_entities (db.core.Bag) – Bag with the contents of entity files.
client (Client | None, optional) – Dask client. Defaults to None.
title (str, optional) – Media title for which the stats are being computed. Defaults to None.
- Returns:
List of counts that match NE DataStatistics keys.
- Return type:
list[dict[str, Any]]
- impresso_essentials.versioning.aggregators.compute_stats_in_img_emb_bag(s3_emb_images: Bag, client: Client | None = None, title: str | None = None) list[dict[str, int | str]]
Compute stats on a dask bag of image embedding output content-items.
- Parameters:
s3_emb_images (db.core.Bag) – Bag with the contents of the embedded images files.
client (Client | None, optional) – Dask client. Defaults to None.
title (str, optional) – Media title for which the stats are being computed. Defaults to None.
- Returns:
List of counts that match image embeddings DataStatistics keys.
- Return type:
list[dict[str, Union[int, str]]]
- impresso_essentials.versioning.aggregators.compute_stats_in_langid_ocrqa_bag(s3_langid_ocrqas: Bag, client: Client | None = None, title: str | None = None) list[dict[str, int | str]]
Compute stats on a dask bag of OCRQA outputs.
- Parameters:
s3_langid_ocrqas (db.core.Bag) – Bag with the contents of the OCRQA files.
client (Client | None, optional) – Dask client. Defaults to None.
title (str, optional) – Media title for which the stats are being computed. Defaults to None.
- Returns:
List of counts that match OCRQA output DataStatistics keys.
- Return type:
list[dict[str, Union[int, str]]]
- impresso_essentials.versioning.aggregators.compute_stats_in_langident_bag(s3_langident: Bag, client: Client | None = None, title: str | None = None) list[dict[str, Any]]
Compute stats on a dask bag of langident output content-items.
- Parameters:
s3_langident (db.core.Bag) – Bag of lang-id content-items.
client (Client | None, optional) – Dask client. Defaults to None.
title (str, optional) – Media title for which the stats are being computed. Defaults to None.
- Returns:
List of counts that match langident DataStatistics keys.
- Return type:
list[dict[str, Any]]
- impresso_essentials.versioning.aggregators.compute_stats_in_lingproc_bag(s3_lingprocs: Bag, client: Client | None = None, title: str | None = None) list[dict[str, int | str]]
Compute stats on a dask bag of linguistic preprocessing output content-items.
- Parameters:
s3_lingprocs (db.core.Bag) – Bag with the contents of the lingproc files.
client (Client | None, optional) – Dask client. Defaults to None.
title (str, optional) – Media title for which the stats are being computed. Defaults to None.
- Returns:
List of counts that match lingproc. DataStatistics keys.
- Return type:
list[dict[str, Union[int, str]]]
- impresso_essentials.versioning.aggregators.compute_stats_in_ocrqa_bag(s3_ocrqas: Bag, client: Client | None = None, title: str | None = None) list[dict[str, int | str]]
Compute stats on a dask bag of OCRQA outputs.
- Parameters:
s3_ocrqas (db.core.Bag) – Bag with the contents of the OCRQA files.
client (Client | None, optional) – Dask client. Defaults to None.
title (str, optional) – Media title for which the stats are being computed. Defaults to None.
- Returns:
List of counts that match OCRQA output DataStatistics keys.
- Return type:
list[dict[str, Union[int, str]]]
- impresso_essentials.versioning.aggregators.compute_stats_in_rebuilt_bag(rebuilt_articles: Bag, key: str = '', include_alias: bool = False, passim: bool = False, client: Client | None = None, title: str | None = None) list[dict[str, int | str]]
Compute stats on a dask bag of rebuilt output content-items.
- Parameters:
rebuilt_articles (db.core.Bag) – Bag with the contents of rebuilt files.
key (str, optional) – Optionally title-year pair for on-the-fly computation. Defaults to “”.
include_alias (bool, optional) – Whether to include the title in the groupby, not necessary for on-the-fly computation. Defaults to False.
passim (bool, optional) – True if rebuilt is in passim format. Defaults to False.
client (Client | None, optional) – Dask client. Defaults to None.
title (str, optional) – Media title for which the stats are being computed. Defaults to None.
- Returns:
List of counts that match rebuilt or paassim DataStatistics keys.
- Return type:
list[dict[str, Union[int, str]]]
- impresso_essentials.versioning.aggregators.compute_stats_in_solr_text_ing_bag(s3_solr_ing_cis: Bag, client: Client | None = None, title: str | None = None) list[dict[str, int | str]]
Compute stats on a dask bag of Solr text post-ingestion reports.
- Parameters:
s3_solr_ing_cis (db.core.Bag) – Bag with the CI ids and token lengths from Solr.
client (Client | None, optional) – Dask client. Defaults to None.
title (str, optional) – Media title for which the stats are being computed. Defaults to None.
- Returns:
List of counts that match Solr text ingestion DataStatistics keys.
- Return type:
list[dict[str, Union[int, str]]]
- impresso_essentials.versioning.aggregators.compute_stats_in_text_reuse_passage_bag(s3_tr_passages: Bag, client: Client | None = None, title: str | None = None) list[dict[str, Any]]
Compute stats on a dask bag of text-reuse passages.
- Parameters:
s3_tr_passages (Bag) – Text-reuse passages contained in one output file.
client (Client | None, optional) – Dask client. Defaults to None.
title (str, optional) – Media title for which the stats are being computed. Defaults to None.
- Returns:
List of counts that match text-reuse DataStatistics keys.
- Return type:
list[dict[str, Any]]
- impresso_essentials.versioning.aggregators.compute_stats_in_topics_bag(s3_topics: Bag, client: Client | None = None, title: str | None = None) list[dict[str, Any]]
Compute stats on a dask bag of topic modeling output content-items.
- Parameters:
s3_topics (db.core.Bag) – Bag with the contents of topics files.
client (Client | None, optional) – Dask client. Defaults to None.
title (str, optional) – Media title for which the stats are being computed. Defaults to None.
- Returns:
List of counts that match topics DataStatistics keys.
- Return type:
list[dict[str, Any]]
- impresso_essentials.versioning.aggregators.counts_for_canonical_issue(issue: dict[str, Any], incl_alias_yr: bool = False, src_medium: str | None = None) dict[str, int]
Given the canonical representation of an issue, get its counts.
- Parameters:
issue (dict[str, Any]) – Canonical JSON representation of an issue.
incl_alias_yr (bool, optional) – Whether the newspaper title and year should be included in the returned dict for later aggregation. Defaults to False.
src_medium (str, optional) – The source medium of this issue. Defaults to None.
- Returns:
Dict listing the counts for this issue, ready to be aggregated.
- Return type:
dict[str, int]
- impresso_essentials.versioning.aggregators.counts_for_rebuilt(rebuilt_ci: dict[str, Any], include_alias: bool = False, passim: bool = False) dict[str, int | str]
Define the counts for 1 given rebuilt content-item to match the count keys.
- Parameters:
rebuilt_ci (dict[str, Any]) – Rebuilt content-item from which to extract counts.
include_alias (bool, optional) – Whether to include the title in resulting dict, not necessary for on-the-fly computation. Defaults to False.
passim (bool, optional) – True if rebuilt is in passim format. Defaults to False.
- Returns:
Dict with rebuilt (passim) keys and counts for 1 CI.
- Return type:
dict[str, Union[int, str]]
- impresso_essentials.versioning.aggregators.finalize(s)
The optional function that will be applied to the result of the agg_tu functions. Part of the ggregating function(s) implementing np.nunique()
- impresso_essentials.versioning.aggregators.freq(x: dict, cols: list[str] | None = None, for_can_cons: bool = False, col: list[str] | None = None) dict
Compute the frequency dict of the given column or columns
- Parameters:
x (dict) – Dict corresponding to aggregated values for one title-year, which contains lists of values to count.
cols (list[str], optional) – List of keys (columns) with lists of values to count. Defaults to [“lang_fd”].
- Returns:
The statistics for the given title-year, with the value counts of the required columns.
- Return type:
dict
- impresso_essentials.versioning.aggregators.log_src_medium_mismatch(obj_id: str, stage: str, prov_src_medium: str, found_src_medium: str) None
Log that the source medium found in the data to agg doesn’t match the one previously set.
- Parameters:
obj_id (str) – Impresso ID of the object for which the mismatch was observed.
stage (str) – Data Stage of the data which was being aggregated.
prov_src_medium (str) – Previously given source medium.
found_src_medium (str) – Source medium found in the data to be aggregated.
- Raises:
AttributeError – There was a mismatch in the expected and found source mediums.
Manifest Computing Script
Command-line script to generate a manifest for an S3 bucket or partition after a processing.
- Usage:
compute_manifest.py –config-file=<cf> –log-file=<lf> [–scheduler=<sch> –nworkers=<nw> –verbose]
Options:
- --config-file=<cf>
Path to config file containing all arguments for manifest computation.
- --log-file=<lf>
Path to log file to use.
- --scheduler=<sch>
Tell dask to use an existing scheduler (otherwise it’ll create one)
- --nworkers=<nw>
number of threads per workers for (local) Dask client. (semantics kept to workers to prevent changes to CLI).
- --verbose
Set logging level to DEBUG (by default is INFO).
- impresso_essentials.versioning.compute_manifest.add_stats_to_mft(manifest: DataManifest, media_alias: str, computed_stats: list[dict], src_medium: str | None = None, provider: str | None = None) DataManifest
Add the statistics computed for a given media alias to an instantiated manifest.
Performs validation to ensure that statistics are being added for the correct media title.
- Parameters:
manifest (DataManifest) – The manifest object to which the statistics will be added.
media_alias (str) – The alias representing the media title these stats belong to.
computed_stats (list[dict]) – A list of dictionaries, each containing computed statistics corresponding to a given stage along with media_alias and year keys.
src_medium (str | None, optional) – The source medium of the alias. Defaults to None.
provider (str | None, optional) – The data provider identifier. Defaults to None.
- Returns:
The updated DataManifest instance with the added statistics.
- Return type:
- impresso_essentials.versioning.compute_manifest.aliases_to_process(config: dict[str, Any]) list[tuple[str, str]] | None
Generate a list of (provider, alias) pairs to consider based on the config.
Uses the input configuration to determine which provider-alias pairs should be included when building the manifest. Blacklisted aliases in alias_blacklist are excluded from the result.
If neither providers nor media_aliases are specified, the function returns None, indicating that all data should be processed. However, any aliases in the blacklist will still be excluded from the processing in this case.
- Parameters:
config (dict[str, Any]) – A configuration dictionary expected to contain: - “providers” (list[str] | None): Optional list of provider names. - “media_aliases” (list[str] | None): Optional list of specific media aliases. - “alias_blacklist” (list[str] | None): Optional list of aliases to exclude.
- Returns:
A list of (provider, alias) tuples to process, or None if all data should be considered (no filtering).
- Return type:
list[tuple[str, str]] | None
Example
>>> config = { ... "providers": ["BNF"], "media_aliases": ["NZZ"], "alias_blacklist": ["JGD", "marieclaire", "lepetitparisien", "legaulois", "lematin"] } >>> aliases_to_process(config) [('BNF', 'excelsior'), ('BNF', 'lafronde'), ('BNF', 'oeuvre'), ('BNF', 'jdpl'), ('BNF', 'lepji'), ('BNF', 'oecaen'), ('BNF', 'oerennes'), ('NZZ', 'NZZ')]
- impresso_essentials.versioning.compute_manifest.compute_stats_for_stage(files_bag: Bag, stage: DataStage, client: Client | None = None, title: str | None = None, src_medium: str | None = None) list[dict] | None
Compute statistics for a specific data stage.
- Parameters:
files_bag (db.core.Bag) – A bag containing files for statistics computation.
stage (DataStage) – The data stage for which statistics are computed.
client (Client | None, optional) – Dask client to use.
title (str, optional) – Media title for which the stats are being computed. Defaults to None.
src_medium (str, optional) – Source medium of the title to process, used for canonical data.
- Returns:
- List of computed yearly statistics, or None if statistics
computation for the given stage is not implemented.
- Return type:
list[dict] | None]
- impresso_essentials.versioning.compute_manifest.create_manifest(config_dict: dict[str, Any], client: Client | None = None) None
Given its configuration, generate the manifest for a given s3 bucket partition.
TODO: separate further into functions
Note
The contents of the configuration file (or dict) are given in markdown file impresso_commons/data/manifest_config/manifest.config.example.md`
- Parameters:
config_dict (dict[str, Any]) – Configuration following the guidelines.
client (Client | None, optional) – Dask client to use.
- impresso_essentials.versioning.compute_manifest.get_files_to_consider(config: dict[str, Any]) dict[str, dict[str, list[str]]] | None
Get the list of S3 files to consider based on the provided configuration.
The s3_files mapping is now provider -> alias -> list of files.
- Parameters:
config (dict[str, Any]) – Configuration parameters with the s3 bucket, titles, and file extensions
- Returns:
- Dict mapping each provider to a dict mapping each
alias to the s3 files to consider, or None if no files found.
- Return type:
dict[str, dict[str, list[str]]] | None
- Raises:
ValueError – If file_extensions in the config is empty or None.
- impresso_essentials.versioning.compute_manifest.main()
- impresso_essentials.versioning.compute_manifest.process_altogether(manifest: DataManifest, s3_files: dict[str, list[str]], stage: DataStage, client: Client | None) DataManifest
Process all fetched S3 files at once, filtering them by title to populate the manifest.
The equivalent of process_by_title but when working with large unified datasets.
- Parameters:
manifest (DataManifest) – The manifest object to be populated with computed statistics.
s3_files (dict[str, list[str]]) – A dictionary mapping each provider to a list of S3 file paths.
stage (DataStage) – The stage of data processing, which determines how statistics should be computed.
client (Client | None) – Optional Dask client to parallelize computation when available.
- Returns:
The updated manifest instance containing statistics for each processed media alias.
- Return type:
- impresso_essentials.versioning.compute_manifest.process_by_title(manifest: DataManifest, s3_files: dict[str, list[str]], stage: DataStage, client: Client | None) DataManifest
Process compute statistics for stage by media title and add them to the manifest.
Invalid or mismatched aliases are logged and ignored.
- Parameters:
manifest (DataManifest) – The manifest object to be populated with computed statistics.
s3_files (dict[str, list[str]]) – A nested dictionary mapping each provider to a dictionary of media aliases and their corresponding lists of S3 file paths.
stage (DataStage) – The processing stage that determines how statistics should be computed.
client (Client | None) – Optional Dask client used to parallelize computation, if available.
- Returns:
The updated DataManifest instance with added statistics for each valid alias.
- Return type:
- impresso_essentials.versioning.compute_manifest.remove_corrupted_files(s3_files: dict[str, dict[str, list[str]]]) dict[str, dict[str, list[str]]]
Check if any of the files to consider found on S3 are corrupted or empty.
If the files are corrupted or empty, they can cause errors in the later steps of the manifest computation. Hence this step allows to prevent this. This step is optional and creates some overhead for the manifest computation. This method also logs any file that has been found to create errors when being read by dask.
- Parameters:
s3_files (dict[str, dict[str, list[str]]]) – S3 archive files to consider that where found.
- Returns:
All non-corrupted/empty archives to use for the manifest.
- Return type:
dict[str, dict[str, list[str]]]
- impresso_essentials.versioning.compute_manifest.validate_config(config: dict[str, Any]) dict[str, Any]
Ensure all required configurations are defined, add any missing optional ones.
- Parameters:
config (dict[str, Any]) – Provided configuration dict to compute the manifest.
- Raises:
ValueError – Some required arguments of the configuration are missing.
- Returns:
Updated config, with any mssing optional argument set to None.
- Return type:
dict[str, Any]