aind_data_asset_indexer package

Submodules

aind_data_asset_indexer.aind_bucket_indexer module

Module to handle syncing changes from DocDb to S3.

class aind_data_asset_indexer.aind_bucket_indexer.AindIndexBucketJob(job_settings: AindIndexBucketJobSettings)

Bases: object

This job will: 1) Loop through the records in DocDb filtered by bucket. If the record does not have valid location, it will log a warning and not process it further. 2.0) For each record, check if the S3 location exists. If the S3 location does not exist, then remove the record from DocDB. 2.1) If the S3 location exists, resolve the core schema json files in the root folder and the original_metadata folder to ensure they are in sync. 2.1.1) Then compare the md5 hashes. If they are different, overwrite the record in S3 with the record from DocDb. Otherwise, do nothing. 3) Scan through each prefix in S3. 4) For each prefix, check if it is in DocDB. 4.1) If already in DocDb, then don’t do anything. 4.2) If a metadata record does not exist and the asset is derived, then register it to DocDB. Assume a subsequent docdb sync job will resolve the original metadata folder and core files as in step 2.1.

run_job()

Main method to run.

aind_data_asset_indexer.codeocean_bucket_indexer module

Module to index Code Ocean processed results and update external links in DocDB.

class aind_data_asset_indexer.codeocean_bucket_indexer.CodeOceanIndexBucketJob(job_settings: CodeOceanIndexBucketJobSettings)

Bases: object

This job will: 1) For records in AIND buckets, update the external links with Code Ocean data asset ids if needed. 2) Download all processed results records from the Code Ocean index 3) Download all the records in DocDB for the Code Ocean bucket. The response is projected to just the {_id, location} fields. 4) Creates a list of locations found in Code Ocean and a list of locations found in DocDB. 5) For locations found in Code Ocean not in DocDB, a new record will be created from the aind-data-schema json files in S3. 6) For locations in DocDB not found in Code Ocean, the records will be removed from DocDB.

run_job()

Main method to run.

aind_data_asset_indexer.index_aind_buckets module

Module to sync list of buckets with DocDb.

class aind_data_asset_indexer.index_aind_buckets.IndexAindBucketsJob(job_settings: AindIndexBucketsJobSettings)

Bases: object

Job to sync records in DocDb to list of aind buckets.

run_job()

Main job runner.

aind_data_asset_indexer.models module

Module to hold job settings models

class aind_data_asset_indexer.models.AindIndexBucketJobSettings(_case_sensitive: bool | None = None, _nested_model_default_partial_update: bool | None = None, _env_prefix: str | None = None, _env_prefix_target: EnvPrefixTarget | None = None, _env_file: DotenvType | None = PosixPath('.'), _env_file_encoding: str | None = None, _env_ignore_empty: bool | None = None, _env_nested_delimiter: str | None = None, _env_nested_max_split: int | None = None, _env_parse_none_str: str | None = None, _env_parse_enums: bool | None = None, _cli_prog_name: str | None = None, _cli_parse_args: bool | list[str] | tuple[str, ...] | None = None, _cli_settings_source: CliSettingsSource[Any] | None = None, _cli_parse_none_str: str | None = None, _cli_hide_none_type: bool | None = None, _cli_avoid_json: bool | None = None, _cli_enforce_required: bool | None = None, _cli_use_class_docs_for_groups: bool | None = None, _cli_exit_on_error: bool | None = None, _cli_prefix: str | None = None, _cli_flag_prefix_char: str | None = None, _cli_implicit_flags: bool | Literal['dual', 'toggle'] | None = None, _cli_ignore_unknown_args: bool | None = None, _cli_kebab_case: bool | Literal['all', 'no_enums'] | None = None, _cli_shortcuts: Mapping[str, str | list[str]] | None = None, _secrets_dir: PathType | None = None, _build_sources: tuple[tuple[PydanticBaseSettingsSource, ...], dict[str, Any]] | None = None, *, s3_bucket: str, n_partitions: int = 20, lookback_days: int | None = None, copy_original_md_subdir: str = 'original_metadata', doc_db_host: str, doc_db_db_name: str | None, doc_db_collection_name: str | None, run_docdb_sync: bool = True, run_s3_sync: bool = True)

Bases: IndexJobSettings

Aind Index Bucket Job Settings

doc_db_collection_name: str | None
doc_db_db_name: str | None
doc_db_host: str
model_config: ClassVar[SettingsConfigDict] = {'arbitrary_types_allowed': True, 'case_sensitive': False, 'cli_avoid_json': False, 'cli_enforce_required': False, 'cli_exit_on_error': True, 'cli_flag_prefix_char': '-', 'cli_hide_none_type': False, 'cli_ignore_unknown_args': False, 'cli_implicit_flags': False, 'cli_kebab_case': False, 'cli_parse_args': None, 'cli_parse_none_str': None, 'cli_prefix': '', 'cli_prog_name': None, 'cli_shortcuts': None, 'cli_use_class_docs_for_groups': False, 'enable_decoding': True, 'env_file': None, 'env_file_encoding': None, 'env_ignore_empty': False, 'env_nested_delimiter': None, 'env_nested_max_split': None, 'env_parse_enums': None, 'env_parse_none_str': None, 'env_prefix': '', 'env_prefix_target': 'variable', 'extra': 'forbid', 'json_file': None, 'json_file_encoding': None, 'nested_model_default_partial_update': False, 'protected_namespaces': ('model_validate', 'model_dump', 'settings_customise_sources'), 'secrets_dir': None, 'toml_file': None, 'validate_default': True, 'yaml_config_section': None, 'yaml_file': None, 'yaml_file_encoding': None}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

run_docdb_sync: bool
run_s3_sync: bool
class aind_data_asset_indexer.models.AindIndexBucketsJobSettings(_case_sensitive: bool | None = None, _nested_model_default_partial_update: bool | None = None, _env_prefix: str | None = None, _env_prefix_target: EnvPrefixTarget | None = None, _env_file: DotenvType | None = PosixPath('.'), _env_file_encoding: str | None = None, _env_ignore_empty: bool | None = None, _env_nested_delimiter: str | None = None, _env_nested_max_split: int | None = None, _env_parse_none_str: str | None = None, _env_parse_enums: bool | None = None, _cli_prog_name: str | None = None, _cli_parse_args: bool | list[str] | tuple[str, ...] | None = None, _cli_settings_source: CliSettingsSource[Any] | None = None, _cli_parse_none_str: str | None = None, _cli_hide_none_type: bool | None = None, _cli_avoid_json: bool | None = None, _cli_enforce_required: bool | None = None, _cli_use_class_docs_for_groups: bool | None = None, _cli_exit_on_error: bool | None = None, _cli_prefix: str | None = None, _cli_flag_prefix_char: str | None = None, _cli_implicit_flags: bool | Literal['dual', 'toggle'] | None = None, _cli_ignore_unknown_args: bool | None = None, _cli_kebab_case: bool | Literal['all', 'no_enums'] | None = None, _cli_shortcuts: Mapping[str, str | list[str]] | None = None, _secrets_dir: PathType | None = None, _build_sources: tuple[tuple[PydanticBaseSettingsSource, ...], dict[str, Any]] | None = None, *, s3_bucket: None = None, n_partitions: int = 20, lookback_days: int | None = None, copy_original_md_subdir: str = 'original_metadata', doc_db_host: str, doc_db_db_name: str | None, doc_db_collection_name: str | None, run_docdb_sync: bool = True, run_s3_sync: bool = True, s3_buckets: List[str])

Bases: AindIndexBucketJobSettings

Job Settings to sync docdb with list of aind managed buckets.

model_config: ClassVar[SettingsConfigDict] = {'arbitrary_types_allowed': True, 'case_sensitive': False, 'cli_avoid_json': False, 'cli_enforce_required': False, 'cli_exit_on_error': True, 'cli_flag_prefix_char': '-', 'cli_hide_none_type': False, 'cli_ignore_unknown_args': False, 'cli_implicit_flags': False, 'cli_kebab_case': False, 'cli_parse_args': None, 'cli_parse_none_str': None, 'cli_prefix': '', 'cli_prog_name': None, 'cli_shortcuts': None, 'cli_use_class_docs_for_groups': False, 'enable_decoding': True, 'env_file': None, 'env_file_encoding': None, 'env_ignore_empty': False, 'env_nested_delimiter': None, 'env_nested_max_split': None, 'env_parse_enums': None, 'env_parse_none_str': None, 'env_prefix': '', 'env_prefix_target': 'variable', 'extra': 'forbid', 'json_file': None, 'json_file_encoding': None, 'nested_model_default_partial_update': False, 'protected_namespaces': ('model_validate', 'model_dump', 'settings_customise_sources'), 'secrets_dir': None, 'toml_file': None, 'validate_default': True, 'yaml_config_section': None, 'yaml_file': None, 'yaml_file_encoding': None}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

s3_bucket: None
s3_buckets: List[str]
class aind_data_asset_indexer.models.CodeOceanIndexBucketJobSettings(_case_sensitive: bool | None = None, _nested_model_default_partial_update: bool | None = None, _env_prefix: str | None = None, _env_prefix_target: EnvPrefixTarget | None = None, _env_file: DotenvType | None = PosixPath('.'), _env_file_encoding: str | None = None, _env_ignore_empty: bool | None = None, _env_nested_delimiter: str | None = None, _env_nested_max_split: int | None = None, _env_parse_none_str: str | None = None, _env_parse_enums: bool | None = None, _cli_prog_name: str | None = None, _cli_parse_args: bool | list[str] | tuple[str, ...] | None = None, _cli_settings_source: CliSettingsSource[Any] | None = None, _cli_parse_none_str: str | None = None, _cli_hide_none_type: bool | None = None, _cli_avoid_json: bool | None = None, _cli_enforce_required: bool | None = None, _cli_use_class_docs_for_groups: bool | None = None, _cli_exit_on_error: bool | None = None, _cli_prefix: str | None = None, _cli_flag_prefix_char: str | None = None, _cli_implicit_flags: bool | Literal['dual', 'toggle'] | None = None, _cli_ignore_unknown_args: bool | None = None, _cli_kebab_case: bool | Literal['all', 'no_enums'] | None = None, _cli_shortcuts: Mapping[str, str | list[str]] | None = None, _secrets_dir: PathType | None = None, _build_sources: tuple[tuple[PydanticBaseSettingsSource, ...], dict[str, Any]] | None = None, *, s3_bucket: str, n_partitions: int = 20, lookback_days: int | None = None, copy_original_md_subdir: str = 'original_metadata', doc_db_host: str, doc_db_db_name: str | None, doc_db_collection_name: str | None, codeocean_domain: str, codeocean_token: SecretStr, run_co_sync: bool = True)

Bases: IndexJobSettings

Code Ocean Index Bucket Job Settings

codeocean_domain: str
codeocean_token: SecretStr
doc_db_collection_name: str | None
doc_db_db_name: str | None
doc_db_host: str
classmethod from_param_store(param_store_name: str)

Construct class from aws param store and secrets manager

Parameters:

param_store_name (str)

model_config: ClassVar[SettingsConfigDict] = {'arbitrary_types_allowed': True, 'case_sensitive': False, 'cli_avoid_json': False, 'cli_enforce_required': False, 'cli_exit_on_error': True, 'cli_flag_prefix_char': '-', 'cli_hide_none_type': False, 'cli_ignore_unknown_args': False, 'cli_implicit_flags': False, 'cli_kebab_case': False, 'cli_parse_args': None, 'cli_parse_none_str': None, 'cli_prefix': '', 'cli_prog_name': None, 'cli_shortcuts': None, 'cli_use_class_docs_for_groups': False, 'enable_decoding': True, 'env_file': None, 'env_file_encoding': None, 'env_ignore_empty': False, 'env_nested_delimiter': None, 'env_nested_max_split': None, 'env_parse_enums': None, 'env_parse_none_str': None, 'env_prefix': '', 'env_prefix_target': 'variable', 'extra': 'forbid', 'json_file': None, 'json_file_encoding': None, 'nested_model_default_partial_update': False, 'protected_namespaces': ('model_validate', 'model_dump', 'settings_customise_sources'), 'secrets_dir': None, 'toml_file': None, 'validate_default': True, 'yaml_config_section': None, 'yaml_file': None, 'yaml_file_encoding': None}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

run_co_sync: bool
class aind_data_asset_indexer.models.IndexJobSettings(_case_sensitive: bool | None = None, _nested_model_default_partial_update: bool | None = None, _env_prefix: str | None = None, _env_prefix_target: EnvPrefixTarget | None = None, _env_file: DotenvType | None = PosixPath('.'), _env_file_encoding: str | None = None, _env_ignore_empty: bool | None = None, _env_nested_delimiter: str | None = None, _env_nested_max_split: int | None = None, _env_parse_none_str: str | None = None, _env_parse_enums: bool | None = None, _cli_prog_name: str | None = None, _cli_parse_args: bool | list[str] | tuple[str, ...] | None = None, _cli_settings_source: CliSettingsSource[Any] | None = None, _cli_parse_none_str: str | None = None, _cli_hide_none_type: bool | None = None, _cli_avoid_json: bool | None = None, _cli_enforce_required: bool | None = None, _cli_use_class_docs_for_groups: bool | None = None, _cli_exit_on_error: bool | None = None, _cli_prefix: str | None = None, _cli_flag_prefix_char: str | None = None, _cli_implicit_flags: bool | Literal['dual', 'toggle'] | None = None, _cli_ignore_unknown_args: bool | None = None, _cli_kebab_case: bool | Literal['all', 'no_enums'] | None = None, _cli_shortcuts: Mapping[str, str | list[str]] | None = None, _secrets_dir: PathType | None = None, _build_sources: tuple[tuple[PydanticBaseSettingsSource, ...], dict[str, Any]] | None = None, *, s3_bucket: str, n_partitions: int = 20, lookback_days: int | None = None, copy_original_md_subdir: str = 'original_metadata')

Bases: BaseSettings

Basic Index Job Settings

copy_original_md_subdir: str
classmethod from_param_store(param_store_name: str)

Construct class from aws param store

Parameters:

param_store_name (str)

lookback_days: int | None
model_config: ClassVar[SettingsConfigDict] = {'arbitrary_types_allowed': True, 'case_sensitive': False, 'cli_avoid_json': False, 'cli_enforce_required': False, 'cli_exit_on_error': True, 'cli_flag_prefix_char': '-', 'cli_hide_none_type': False, 'cli_ignore_unknown_args': False, 'cli_implicit_flags': False, 'cli_kebab_case': False, 'cli_parse_args': None, 'cli_parse_none_str': None, 'cli_prefix': '', 'cli_prog_name': None, 'cli_shortcuts': None, 'cli_use_class_docs_for_groups': False, 'enable_decoding': True, 'env_file': None, 'env_file_encoding': None, 'env_ignore_empty': False, 'env_nested_delimiter': None, 'env_nested_max_split': None, 'env_parse_enums': None, 'env_parse_none_str': None, 'env_prefix': '', 'env_prefix_target': 'variable', 'extra': 'forbid', 'json_file': None, 'json_file_encoding': None, 'nested_model_default_partial_update': False, 'protected_namespaces': ('model_validate', 'model_dump', 'settings_customise_sources'), 'secrets_dir': None, 'toml_file': None, 'validate_default': True, 'yaml_config_section': None, 'yaml_file': None, 'yaml_file_encoding': None}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

n_partitions: int
s3_bucket: str
class aind_data_asset_indexer.models.PopulateAindBucketsJobSettings(_case_sensitive: bool | None = None, _nested_model_default_partial_update: bool | None = None, _env_prefix: str | None = None, _env_prefix_target: EnvPrefixTarget | None = None, _env_file: DotenvType | None = PosixPath('.'), _env_file_encoding: str | None = None, _env_ignore_empty: bool | None = None, _env_nested_delimiter: str | None = None, _env_nested_max_split: int | None = None, _env_parse_none_str: str | None = None, _env_parse_enums: bool | None = None, _cli_prog_name: str | None = None, _cli_parse_args: bool | list[str] | tuple[str, ...] | None = None, _cli_settings_source: CliSettingsSource[Any] | None = None, _cli_parse_none_str: str | None = None, _cli_hide_none_type: bool | None = None, _cli_avoid_json: bool | None = None, _cli_enforce_required: bool | None = None, _cli_use_class_docs_for_groups: bool | None = None, _cli_exit_on_error: bool | None = None, _cli_prefix: str | None = None, _cli_flag_prefix_char: str | None = None, _cli_implicit_flags: bool | Literal['dual', 'toggle'] | None = None, _cli_ignore_unknown_args: bool | None = None, _cli_kebab_case: bool | Literal['all', 'no_enums'] | None = None, _cli_shortcuts: Mapping[str, str | list[str]] | None = None, _secrets_dir: PathType | None = None, _build_sources: tuple[tuple[PydanticBaseSettingsSource, ...], dict[str, Any]] | None = None, *, s3_bucket: None = None, n_partitions: int = 20, lookback_days: int | None = None, copy_original_md_subdir: str = 'original_metadata', s3_buckets: List[str])

Bases: IndexJobSettings

Job Settings to populate a list of aind managed buckets with metadata.nd.json files

model_config: ClassVar[SettingsConfigDict] = {'arbitrary_types_allowed': True, 'case_sensitive': False, 'cli_avoid_json': False, 'cli_enforce_required': False, 'cli_exit_on_error': True, 'cli_flag_prefix_char': '-', 'cli_hide_none_type': False, 'cli_ignore_unknown_args': False, 'cli_implicit_flags': False, 'cli_kebab_case': False, 'cli_parse_args': None, 'cli_parse_none_str': None, 'cli_prefix': '', 'cli_prog_name': None, 'cli_shortcuts': None, 'cli_use_class_docs_for_groups': False, 'enable_decoding': True, 'env_file': None, 'env_file_encoding': None, 'env_ignore_empty': False, 'env_nested_delimiter': None, 'env_nested_max_split': None, 'env_parse_enums': None, 'env_parse_none_str': None, 'env_prefix': '', 'env_prefix_target': 'variable', 'extra': 'forbid', 'json_file': None, 'json_file_encoding': None, 'nested_model_default_partial_update': False, 'protected_namespaces': ('model_validate', 'model_dump', 'settings_customise_sources'), 'secrets_dir': None, 'toml_file': None, 'validate_default': True, 'yaml_config_section': None, 'yaml_file': None, 'yaml_file_encoding': None}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

s3_bucket: None
s3_buckets: List[str]

aind_data_asset_indexer.populate_aind_buckets module

Module to populate list of buckets with metadata.nd.json files.

class aind_data_asset_indexer.populate_aind_buckets.PopulateAindBucketsJob(job_settings: PopulateAindBucketsJobSettings)

Bases: object

Job to populate a list of aind buckets with metadata json files and copy original core schema jsons to a subfolder.

run_job()

Main job runner.

aind_data_asset_indexer.populate_s3_with_metadata_files module

Module to handle populating s3 bucket with metadata files.

class aind_data_asset_indexer.populate_s3_with_metadata_files.AindPopulateMetadataJsonJob(job_settings: IndexJobSettings)

Bases: object

This job will: 1) Crawl through an S3 bucket 2) Look inside each prefix that adheres to data asset naming convention 3) If the name is a data asset name, then it will look inside the prefix 4) It will create a metadata.nd.json by using any of the core json files it finds. Any existing metadata.nd.json will be overwritten. 5.1) The contents of any existing core json files will be copied to /original_metadata/{core_schema}.{date_stamp}.json. 5.2) The core json files will be overwritten with the new fields from metadata.nd.json or deleted if they are not found in metadata.nd.json.

run_job()

Main method to run. This will: 1) Iterate through prefixes in s3, 1000 at a time 2) Divvy up the 1000 prefixes across dask n_partitions 3) Process each prefix in each set in each partition

aind_data_asset_indexer.utils module

Package for common methods used such as interfacing with S3 and DocDB.

aind_data_asset_indexer.utils.build_metadata_record_from_prefix(bucket: str, prefix: str, s3_client: S3Client, optional_name: str | None = None, optional_created: datetime | None = None, optional_external_links: Dict[str, List[str]] | None = None) str | None

For a given bucket and prefix, this method will return a JSON string representation of a Metadata record. The Metadata record will be constructed from any non-corrupt core schema json files found under the prefix. If there are issues with Metadata construction, then it will return None.

Parameters:
  • bucket (str)

  • prefix (str)

  • s3_client (S3Client)

  • optional_name (Optional[str]) – If optional_name is None, then a name will be constructed from the s3_prefix. Default is None.

  • optional_created (Optional[datetime]) – User can override created datetime. Default is None.

  • optional_external_links (Optional[Dict[str, List[str]]]) – User can provide external_links. Default is None.

Returns:

The constructed Metadata record as a json string. Will return None if there are issues with Metadata construction.

Return type:

Optional[str]

aind_data_asset_indexer.utils.compute_md5_hash(json_contents: str) str

Computes the md5 hash of the object as it would be stored in S3. Useful for comparing against the S3 object e-tag to check if they are the same.

Parameters:

json_contents (str) – JSON string representation of an object.

Returns:

The md5 hash of the object as it would be uploaded to S3.

Return type:

str

aind_data_asset_indexer.utils.cond_copy_then_sync_core_json_files(metadata_json: str, bucket: str, prefix: str, s3_client: S3Client, copy_original_md_subdir: str) None

For a given bucket and prefix 1) Copy the core schema files if a copy does not already exist. 2) Sync the core schema files with core fields from the metadata record. If the original core schema json was corrupt, then it will be deleted after its original contents are copied.

Parameters:
  • metadata_json (str) – The JSON string representation of the Metadata record.

  • bucket (str) – The name of the S3 bucket.

  • prefix (str) – The prefix for the S3 object keys.

  • s3_client (S3Client) – The S3 client object.

  • copy_original_md_subdir (str) – Subdirectory to copy original core schema json files to. Default is ‘original_metadata’.

Return type:

None

aind_data_asset_indexer.utils.copy_core_json_files(bucket: str, prefix: str, s3_client: S3Client, copy_original_md_subdir: str) None

For a given bucket and prefix, copy the core schema files to a sub-directory.

Parameters:
  • bucket (str) – The name of the S3 bucket.

  • prefix (str) – The prefix for the S3 object keys.

  • s3_client (S3Client) – The S3 client object.

  • copy_original_md_subdir (str) – Subdirectory to copy original core schema json files to. For example, ‘original_metadata’.

Return type:

None

aind_data_asset_indexer.utils.create_metadata_object_key(prefix: str) str

For a given s3 prefix, create the expected object key for the metadata.nd.json file.

Parameters:

prefix (str) – For example, ecephys_123456_2020-10-10_01-02-03

Returns:

For example, ecephys_123456_2020-10-10_01-02-03/metadata.nd.json

Return type:

str

aind_data_asset_indexer.utils.create_object_key(prefix: str, filename: str) str

For a given s3 prefix and filename, create the expected object key for the file.

Parameters:
  • prefix (str) – For example, ecephys_123456_2020-10-10_01-02-03

  • filename (str) – For example, ‘metadata.nd.json’

Returns:

For example, ecephys_123456_2020-10-10_01-02-03/metadata.nd.json

Return type:

str

aind_data_asset_indexer.utils.does_s3_metadata_copy_exist(s3_client: S3Client, bucket: str, prefix: str, copy_subdir: str)

For a given bucket and prefix, check if there are any original core schema jsons in the copy_subdir. Uses the list_objects operation.

Parameters:
  • s3_client (S3Client)

  • bucket (str)

  • prefix (str) – For example, ecephys_123456_2020-10-10_01-02-03

  • copy_subdir (str) – For example, original_metadata

Returns:

True if any of the core schema jsons exists in the copy_subdir, otherwise False.

Return type:

bool

aind_data_asset_indexer.utils.does_s3_object_exist(s3_client: S3Client, bucket: str, key: str) bool

Check that a file exists inside a bucket. Uses the head_object operation, which is cheaper compared to the list_objects operation.

Parameters:
  • s3_client (S3Client)

  • bucket (str)

  • key (str) – For example, behavior_655019_2020-10-10_01-00-23/subject.json

Returns:

True if the file exists, otherwise False.

Return type:

bool

aind_data_asset_indexer.utils.does_s3_prefix_exist(s3_client: S3Client, bucket: str, prefix: str) bool

Check that a prefix exists inside a bucket.

Parameters:
  • s3_client (S3Client)

  • bucket (str)

  • prefix (str) – For example, behavior_655019_2020-10-10_01-00-23

Returns:

True if the prefix exists, otherwise False.

Return type:

bool

aind_data_asset_indexer.utils.download_json_file_from_s3(s3_client: S3Client, bucket: str, object_key: str) dict | None

Downloads json file contents from S3. Will return None if object is not a valid json file.

Parameters:
  • s3_client (S3Client)

  • bucket (str)

  • object_key (str)

Return type:

dict | None

aind_data_asset_indexer.utils.get_all_processed_codeocean_asset_records(co_client: CodeOcean, co_data_asset_bucket: str) Dict[str, dict]

Gets all the data asset records we’re interested in indexing. The location field in the output is the expected location of the data asset. It may still require double-checking that the s3 location is valid.

Parameters:
  • co_client (CodeOcean)

  • co_data_asset_bucket (str) – Name of Code Ocean’s data asset bucket

Returns:

{
data_asset_location: {

“name”: data_asset_name, “location”: data_asset_location, “co_asset_id”: data_asset_id, “co_computation_id”: data_asset_computation_id,

}

}

Return type:

Dict[str, dict]

aind_data_asset_indexer.utils.get_dict_of_core_schema_file_info(s3_client: S3Client, bucket: str, prefix: str) Dict[str, dict | None]

For a bucket and prefix get list of core schema file info.

Parameters:
  • s3_client (S3Client)

  • bucket (str)

  • prefix (str)

Returns:

{“subject.json”: {“last_modified”: datetime, “e_tag”: str, “version_id”: str}, “procedures.json”: {“last_modified”: datetime, “e_tag”: str, “version_id”: str}, … }

Return type:

Dict[str, Optional[dict]]

aind_data_asset_indexer.utils.get_dict_of_file_info(s3_client: S3Client, bucket: str, keys: List[str]) Dict[str, dict | None]

For a list of object keys, returns a list of metadata info for each object that exists in the bucket.

Parameters:
  • s3_client (S3Client)

  • bucket (str)

  • keys (List[str])

Returns:

Shape of dict is {“last_modified”: datetime, “e_tag”: str, “version_id”: str}

Return type:

Dict[str, Optional[dict]]

aind_data_asset_indexer.utils.is_prefix_valid(prefix: str) bool

Check if a given prefix is valid. A valid prefix conforms to a regex pattern defined in aind-data-schema.

Parameters:

prefix (str) – For example, ‘ecephys_123456_2020-10-10_01-02-03’

Returns:

True if the prefix is valid, otherwise False.

Return type:

bool

aind_data_asset_indexer.utils.is_record_location_valid(record: dict, expected_bucket: str, expected_prefix: str | None = None) bool

Check if a given record has a valid location url.

Parameters:
  • record (dict) – Metadata record as a dictionary

  • expected_bucket (str) – The expected s3 bucket the location should have.

  • expected_prefix (Optional[str]) – If provided, also check that the record location matches the expected s3_prefix. Default is None, which won’t perform the check.

Returns:

True if there is a location field and the url in the field has a form like ‘s3://{expected_bucket}/prefix’ Will return False if there is no s3 scheme, the bucket does not match the expected bucket, the prefix contains forward slashes, or does not match the expected prefix. If the record name does not match the prefix, a warning is logged, but the method will still return True.

Return type:

bool

aind_data_asset_indexer.utils.iterate_through_top_level(s3_client: S3Client, bucket: str, max_pages: int | None = None) Iterator[List[str]]

Returns an iterator of s3 responses. If prefix is None, then will return an iterator of top-level prefixes of a bucket. Otherwise, will return an iterator of the top level items under a prefix.

Parameters:
  • s3_client (S3Client)

  • bucket (str)

  • max_pages (Optional[int]) – Number of pages to return. None returns maximum number allowed.

Returns:

Returns an iterator. Each object in the iterator is a list of up to 1000 prefixes in a bucket.

Return type:

Iterator[List[str]]

aind_data_asset_indexer.utils.list_metadata_copies(s3_client: S3Client, bucket: str, prefix: str, copy_subdir: str) List[str]

For a given bucket and prefix, return a list of the core schemas in the copy_subdir.

Parameters:
  • s3_client (S3Client)

  • bucket (str)

  • prefix (str) – For example, ecephys_123456_2020-10-10_01-02-03

  • copy_subdir (str) – For example, original_metadata

Returns:

A list of the core schemas in the copy_subdir without timestamp, e.g, [“subject”, “procedures”, “processing”]

Return type:

List[str]

aind_data_asset_indexer.utils.sync_core_json_files(metadata_json: str, bucket: str, prefix: str, s3_client: S3Client) None

Sync the core schema files with the core fields from metadata.nd.json. Core schema jsons are only updated if their contents are outdated. Core schema jsons are created if they don’t already exist. If a core field is None in metadata.nd.json but the core schema json exists in s3, then the core schema json will be deleted.

Parameters:
  • metadata_json (str) – The JSON string representation of the Metadata record.

  • bucket (str) – The name of the S3 bucket.

  • prefix (str) – The prefix for the S3 object keys.

  • s3_client (S3Client) – The S3 client object.

Return type:

None

aind_data_asset_indexer.utils.upload_json_str_to_s3(bucket: str, object_key: str, json_str: str, s3_client: S3Client) PutObjectOutputTypeDef

Upload JSON string contents to a location in S3.

Parameters:
  • bucket (str) – For example, ‘aind-open-data’

  • object_key (str) – For example, ‘prefix/original_metadata/subject.json’

  • json_str (str) – JSON string to upload as JSON file.

  • s3_client (S3Client)

Returns:

Response of the put object operation.

Return type:

PutObjectOutputTypeDef

aind_data_asset_indexer.utils.upload_metadata_json_str_to_s3(bucket: str, metadata_json: str, prefix: str, s3_client: S3Client) PutObjectOutputTypeDef

Upload JSON string representation of the contents of the metadata.nd.json file to a location in S3.

Parameters:
  • bucket (str)

  • metadata_json (str)

  • prefix (str)

  • s3_client (S3Client)

Returns:

Response of the put object operation.

Return type:

PutObjectOutputTypeDef

Module contents

Package