import os
from typing import List, Optional, Type, Tuple
import yaml
from credsweeper.common.constants import RuleType, MIN_VARIABLE_LENGTH, MIN_SEPARATOR_LENGTH, MIN_VALUE_LENGTH, \
MAX_LINE_LENGTH, Separator, DEFAULT_ENCODING
from credsweeper.config import Config
from credsweeper.credentials import Candidate
from credsweeper.file_handler.analysis_target import AnalysisTarget
from credsweeper.logger.logger import logging
from credsweeper.rules import Rule
from credsweeper.scanner.scan_type import MultiPattern, PemKeyPattern, ScanType, SinglePattern
[docs]class Scanner:
"""Advanced Credential Scanner base class.
Parameters:
rules: list of rule objects to check
min_pattern_len: minimal length specified in all pattern rules
min_keyword_len: minimal possible length for a string to be matched by any keyword rule
min_len: Smallest between min_pattern_len and min_keyword_len
TargetGroup: Type for List[Tuple[AnalysisTarget, str, int]]
"""
TargetGroup = List[Tuple[AnalysisTarget, str, int]]
def __init__(self, config: Config, rule_path: Optional[str]) -> None:
self.config = config
self._set_rules(rule_path)
self.__scanner_for_rule = {rule.rule_name: self.get_scanner(rule) for rule in self.rules}
def _set_rules(self, rule_path: Optional[str]) -> None:
self.rules: List[Rule] = []
if rule_path is None:
project_dir_path = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
rule_path = os.path.join(project_dir_path, "rules", "config.yaml")
with open(rule_path, "r", encoding=DEFAULT_ENCODING) as f:
rule_templates = yaml.load(f, Loader=yaml.Loader)
for rule_template in rule_templates:
self.rules.append(Rule(self.config, rule_template))
self.min_pattern_len = 999
for rule in self.rules:
if rule.rule_type == RuleType.PATTERN:
self.min_pattern_len = min(self.min_pattern_len, rule.min_line_len)
self.min_keyword_len = MIN_VARIABLE_LENGTH + MIN_SEPARATOR_LENGTH + MIN_VALUE_LENGTH
self.min_len = min(self.min_keyword_len, self.min_pattern_len)
def _select_and_group_targets(self, targets: List[AnalysisTarget]) -> Tuple[TargetGroup, TargetGroup, TargetGroup]:
"""Group targets into 3 lists based on loaded rules.
Args:
targets: List of AnalysisTarget to analyze
Return:
Three TargetGroup objects: one for keywords, one for patterns, and one for PEM keys
"""
keyword_targets = []
pattern_targets = []
pem_targets = []
for target in targets:
# Ignore target if it's too long
if len(target.line) > MAX_LINE_LENGTH:
continue
# Trim string from outer spaces to make future `a in str` checks faster
target_line_trimmed = target.line.strip()
target_line_trimmed_len = len(target_line_trimmed)
# Ignore target if trimmed part is too short
if target_line_trimmed_len < self.min_len:
continue
target_line_trimmed_lower = target_line_trimmed.lower()
# Check if have at least one separator character. Otherwise cannot be matched by a keyword
if any(x in target_line_trimmed for x in Separator.common_as_set):
keyword_targets.append((target, target_line_trimmed_lower, target_line_trimmed_len))
# Check if have length not smaller than smallest `min_line_len` in all pattern rules
if target_line_trimmed_len >= self.min_pattern_len:
pattern_targets.append((target, target_line_trimmed_lower, target_line_trimmed_len))
# Check if have "BEGIN" substring. Cannot otherwise ba matched as a PEM key
if "BEGIN" in target_line_trimmed:
pem_targets.append((target, target_line_trimmed_lower, target_line_trimmed_len))
return keyword_targets, pattern_targets, pem_targets
[docs] def scan(self, targets: List[AnalysisTarget]) -> List[Candidate]:
"""Run scanning of list of target lines from 'targets' with set of rule from 'self.rules'.
Args:
targets: objects with data to analyse: line, line number,
filepath and all lines in file
Return:
list of all detected credential candidates in analysed targets
"""
credentials = []
keyword_targets, pattern_targets, pem_targets = self._select_and_group_targets(targets)
for rule in self.rules:
min_line_len = rule.min_line_len
required_substrings = rule.required_substrings
scanner = self.__scanner_for_rule[rule.rule_name]
to_check = self.get_targets_to_check(keyword_targets, pattern_targets, pem_targets, rule)
# It is almost two times faster to precompute values related to target_line than to compute them in
# each iteration
for target, target_line_trimmed_lower, target_line_trimmed_len in to_check:
if target_line_trimmed_len < min_line_len:
continue
if not any(substring in target_line_trimmed_lower for substring in required_substrings):
continue
new_credential = scanner.run(self.config, target.line, target.line_num, target.file_path, rule,
target.lines)
if new_credential:
logging.debug(
f"Credential for rule: {rule.rule_name}"
f" in file: {target.file_path}:{target.line_num} in line: {target.line}"
)
credentials.append(new_credential)
return credentials
[docs] @classmethod
def get_scanner(cls, rule: Rule) -> Type[ScanType]:
"""Choose type of scanner base on rule affiliation.
Args:
rule: rule object used to scanning
Return:
depending on the rule type, returns the corresponding scanner class
"""
if rule.pattern_type == Rule.SINGLE_PATTERN:
return SinglePattern
elif rule.pattern_type == Rule.MULTI_PATTERN:
return MultiPattern
elif rule.pattern_type == Rule.PEM_KEY_PATTERN:
return PemKeyPattern
raise ValueError(f"Unknown pattern_type in rule: {rule.pattern_type}")
[docs] @staticmethod
def get_targets_to_check(keyword_targets: TargetGroup, pattern_targets: TargetGroup, pem_targets: TargetGroup,
rule: Rule) -> TargetGroup:
"""Choose target subset based on a rule.
Args:
keyword_targets: TargetGroup with targets relevant to a keyword based rules
pattern_targets: TargetGroup with targets relevant to a pattern based rules
pem_targets: TargetGroup with targets relevant to a pem key rules
rule: rule object used to scanning
Return:
depending on the rule type, returns one of the other arguments
"""
if rule.rule_type == RuleType.KEYWORD:
return keyword_targets
elif rule.rule_type == RuleType.PATTERN:
return pattern_targets
elif rule.rule_type == RuleType.PEM_KEY:
return pem_targets
else:
raise ValueError(f"Unknown RuleType {rule.rule_type}")