Source code for credsweeper.validations.apply_validation

from multiprocessing import Pool
from typing import List

from credsweeper.common.constants import KeyValidationOption
from credsweeper.credentials import Candidate, CredentialManager
from credsweeper.logger.logger import logging
from credsweeper.validations.validation import Validation


[docs]class ApplyValidation: """Class that allow parallel API validation using already declared pool."""
[docs] def validate_credentials(self, pool: Pool, credential_manager: CredentialManager) -> None: old_cred: List[Candidate] = credential_manager.get_credentials() new_cred = [] validations: List[KeyValidationOption] = pool.map(self.validate, old_cred) for cred, validation in zip(old_cred, validations): cred.api_validation = validation new_cred.append(cred) credential_manager.set_credentials(new_cred)
[docs] def validate(self, cred: Candidate) -> KeyValidationOption: """Iterate over all `validations` in current cred. If any validation results in VALIDATED_KEY - final result is VALIDATED_KEY If no VALIDATED_KEY, but at least one INVALID_KEY - final result is INVALID_KEY UNDECIDED otherwise """ validation_option = KeyValidationOption.UNDECIDED if not cred.is_api_validation_available: logging.debug( f"No validation with external API available for current credential candidate: " f"{cred.line_data_list[0].line}" ) return KeyValidationOption.NOT_AVAILABLE validation: Validation for validation in cred.validations: current_api_validation: KeyValidationOption = validation.verify(cred.line_data_list) if current_api_validation is KeyValidationOption.VALIDATED_KEY: logging.debug( f"Valid validation by: {validation.__class__.__name__} for line: {cred.line_data_list[0].line}") validation_option = current_api_validation break if current_api_validation is KeyValidationOption.INVALID_KEY: logging.debug( f"Invalid validation by: {validation.__class__.__name__} for line: {cred.line_data_list[0].line}") validation_option = current_api_validation return validation_option