Server IP : 104.21.93.192 / Your IP : 216.73.216.124 Web Server : LiteSpeed System : Linux premium900.web-hosting.com 4.18.0-553.22.1.lve.1.el8.x86_64 #1 SMP Tue Oct 8 15:52:54 UTC 2024 x86_64 User : redwjova ( 1790) PHP Version : 8.1.32 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : OFF | Pkexec : OFF Directory : /opt/imunify360/venv/lib64/python3.11/site-packages/imav/patchman/fs_scanner/ |
Upload File : |
""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>. Copyright © 2019 Cloud Linux Software Inc. This software is also available under ImunifyAV commercial license, see <https://www.imunify360.com/legal/eula> """ import abc import json import os from collections import defaultdict from dataclasses import dataclass, field from typing import List, NamedTuple, Optional from .db import ( DB, PatchDependencyMatch, VersionMatch, HashState, DefinitionType, ) from .utils import HashCalculator, get_base_dir class FileIdentifier(NamedTuple): rel_path: str hash: str vuln_id: Optional[int] = None vuln_type: Optional[int] = None @dataclass class VersionIdentifier: id: int hash: str file_identifiers: list[FileIdentifier] # one identifier can match multiple base_dirs, need to keep track of them to avoid duplicate scanning matched_base_dirs: set = field(default_factory=set) @dataclass class PatchDependency: files: list[FileIdentifier] @dataclass class HashDefinition: type: DefinitionType id: int hash: str state: HashState class Matcher(abc.ABC): def __init__(self, input_file: str): self.dict_of_identifiers = self._parse_input(input_file) @abc.abstractmethod def _parse_input(self, file_path: str) -> dict[str, list[tuple]]: pass class VersionsMatcher(Matcher): @staticmethod def _parse_path_hash_pairs(file_hashes: str) -> list[FileIdentifier]: # accepts file_hashes string like [<file_hash>|<file_path>|]*n # returns list of FileIdentifier objects parts = file_hashes.strip().split("|") return [ FileIdentifier(rel_path, hash_) for rel_path, hash_ in zip(parts[1::2], parts[::2]) ] def _parse_line(self, line: str) -> Optional[VersionIdentifier]: # each line is made up as <state>:<id>:<reporting_hash>:[file_hashes] # and <file_hashes> is a list of "<file_hash>|<file_path>|" pairs state, id_, hash_, file_hashes = line.strip().split(":") if state != "+": return None return VersionIdentifier( id_, hash_, self._parse_path_hash_pairs(file_hashes) ) def _parse_input( self, file_path: str ) -> dict[str, list[VersionIdentifier]]: # reads file version_identifiers with contents like # +:10831:38ed3878c51c61af938cd4fd9228b23b:ad8d2ec0797fbe584a2f5c1e0985b188|classes/Product.php|e890fa7432bbe7bee4dcbbff1009ca4b|app/AppKernel.php| plugins_identifiers_by_path: dict[ str, list[VersionIdentifier] ] = defaultdict(list) with open(file_path, "r") as file: for line in file: if new_identifier := self._parse_line(line): plugins_identifiers_by_path[ new_identifier.file_identifiers[0].rel_path ].append(new_identifier) return plugins_identifiers_by_path def has_full_match( self, plugin_identifier: VersionIdentifier, base_dir: str, hash_calculator: HashCalculator, ) -> bool: # 1) check that all files from file_identifiers exist in their paths relative to base_dir for file_identifier in plugin_identifier.file_identifiers: if not os.path.isfile( os.path.join(base_dir, file_identifier.rel_path) ): return False # 2) all files exist, now check their hashes for file_identifier in plugin_identifier.file_identifiers: if ( hash_calculator.calc_hash( os.path.join(base_dir, file_identifier.rel_path), apply_normalization=True, ) != file_identifier.hash ): return False return True def match_and_save( self, full_path: str, relative_path: str, db: DB, hash_calculator: HashCalculator, ): is_matched = False # check if we have any version_identifier matching given path for plugin_identifier in self.dict_of_identifiers.get( relative_path, [] ): base_dir = get_base_dir(full_path, relative_path) # skip if we already have matched this base_dir with this plugin_identifier if ( base_dir not in plugin_identifier.matched_base_dirs and self.has_full_match( plugin_identifier, base_dir, hash_calculator ) ): plugin_identifier.matched_base_dirs.add(base_dir) db.versions_matches.buffered_insert( VersionMatch( id=plugin_identifier.id, path=base_dir, hash=plugin_identifier.hash, ) ) is_matched = True return is_matched class PatchDependenciesMatcher(Matcher): def _parse_input(self, file_path: str) -> dict[str, list[PatchDependency]]: # read patch_dependencies file # each line represent a patch dependency and is made of a list of FileToPatch objects, like: # +[{"hash": "(...)", "checksum": "(...)", "vulnerability_type": 10, "vulnerability_id": 4346, \ # "filename": "popup-builder/com/helpers/AdminHelper.php"}, \ # {"hash": "(...)", "checksum": "(...)", "vulnerability_type": 10, "vulnerability_id": 4347, \ # "filename": "popup-builder/com/classes/Ajax.php"}] # we should consider only those lines starting with "+" patch_deps: dict[str, list[PatchDependency]] = defaultdict(list) with open(file_path, "r") as file: for line in file: state, data = line[0], line[1:] if state != "+": continue patch_dependency = PatchDependency( files=[ FileIdentifier( rel_path=_["filename"], hash=_["hash"], vuln_id=_["vulnerability_id"], vuln_type=_["vulnerability_type"], ) for _ in json.loads(data) ] ) for file_identifier in patch_dependency.files: patch_deps[file_identifier.rel_path].append( patch_dependency ) return patch_deps def match_and_save( self, full_path: str, relative_path: str, db: DB, hash_calculator: HashCalculator, ): is_matched = False for patch_dependency in self.dict_of_identifiers.get( relative_path, [] ): base_dir = get_base_dir(full_path, relative_path) # for each matching file add PatchDependencyMatch to db # if all files matching patch_dependency are found, set dependencies_met=True to all of them matches_to_insert = [] # [(path, hash, vuln_id, vuln_type), ...] for file_identifier in patch_dependency.files: if ( file_identifier.rel_path == relative_path and hash_calculator.calc_hash( os.path.join(base_dir, file_identifier.rel_path), ) == file_identifier.hash ): # todo: fix duplicates in PatchDependencyMatch table: add a constraint in table # and make a common dict for all the file_identifiers to eliminate duplicates in ram matches_to_insert.append( ( os.path.join(base_dir, file_identifier.rel_path), file_identifier.hash, file_identifier.vuln_id, file_identifier.vuln_type, ) ) is_matched = True # if all files matched, set dependencies_met=True matches_to_insert = [ PatchDependencyMatch( *row, dependencies_met=( len(matches_to_insert) == len(patch_dependency.files) ), ) for row in matches_to_insert ] [ db.patch_dependencies.buffered_insert(match) for match in matches_to_insert ] return is_matched class HashesMatcher: def __init__(self, hashes_file: str): self.hash_records: List[HashDefinition] = self._parse_input( hashes_file ) self._seen = set() @staticmethod def _parse_input(hashes_path: str) -> List[HashDefinition]: """ Parses the hashes file and returns a list of HashDefinition, filtering out malware-related types and state==2. The lines look like <type>:<id>:<hash>:<state> Example: 2:675:ab43f2f7ad32404e1b923a8387f1a167:2 Where :code:`type` can be one of the following: * DEFINITION_TYPE_MALWARE = 1 * DEFINITION_TYPE_VULNERABILITY = 2 * DEFINITION_TYPE_APPLICATION = 3 * DEFINITION_TYPE_DRYRUN = 4 * DEFINITION_TYPE_MALWARE_RULE = 7 * DEFINITION_TYPE_MALWARE_RULE_DRYRUN = 8 * DEFINITION_TYPE_VULNERABILITY_ECOMMERCE = 9 * DEFINITION_TYPE_VULNERABILITY_PLUGIN = 10 """ MALWARE_TYPES = { DefinitionType.MALWARE.value, DefinitionType.MALWARE_RULE.value, DefinitionType.MALWARE_RULE_DRYRUN.value, } result = [] with open(hashes_path, "r") as f: for line in f: parts = line.strip().split(":") if len(parts) != 4: continue typ, id_, hash_, state = parts typ_int = int(typ) state_int = int(state) if ( typ_int in MALWARE_TYPES or state_int == HashState.SUPERSEDED.value ): continue result.append( HashDefinition( type=DefinitionType(typ_int), id=int(id_), hash=hash_, state=HashState(state_int), ) ) return result def match_and_save(self, file_path, relative_path, db, hash_calculator): file_hash = hash_calculator.calc_hash(file_path) for record in self.hash_records: key = ( file_path, file_hash, record.type.value, record.id, record.state.value, ) if file_hash == record.hash and key not in self._seen: db.hashes_matches.buffered_insert(key) self._seen.add(key) return True # stop after first match return False