Skip to content

Commit

Permalink
Add support for extract paths being files (#244)
Browse files Browse the repository at this point in the history
extractPaths given to Surfactant in a specimen config file can now point to individual files.
Also fixed a  MACH-O check that caused crashes for unrecognized file types.
  • Loading branch information
nightlark committed Oct 7, 2024
1 parent 43009a2 commit a795327
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 108 deletions.
205 changes: 99 additions & 106 deletions surfactant/cmd/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@

# Converts from a true path to an install path
def real_path_to_install_path(root_path: str, install_path: str, filepath: str) -> str:
return re.sub("^" + root_path + "/", install_path, filepath)
# appending a "/" to root_path can cause problems if it is "" or already ends with "/"
if root_path != "" and not root_path.endswith("/"):
return re.sub("^" + root_path + "/", install_path, filepath)
return re.sub("^" + root_path, install_path, filepath)


def get_software_entry(
Expand All @@ -38,10 +41,14 @@ def get_software_entry(
include_all_files=False,
) -> Tuple[Software, List[Software]]:
sw_entry = Software.create_software_from_file(filepath)
if root_path and install_path:
if root_path is not None and install_path is not None:
sw_entry.installPath = [real_path_to_install_path(root_path, install_path, filepath)]
if root_path and container_uuid:
sw_entry.containerPath = [re.sub("^" + root_path, container_uuid, filepath)]
if root_path is not None and container_uuid is not None:
# make sure there is a "/" separating container uuid and the filepath
if root_path != "" and not root_path.endswith("/"):
sw_entry.containerPath = [re.sub("^" + root_path, container_uuid, filepath)]
else:
sw_entry.containerPath = [re.sub("^" + root_path, container_uuid + "/", filepath)]
sw_entry.recordedInstitution = user_institution_name
sw_children = []

Expand Down Expand Up @@ -125,26 +132,35 @@ def print_input_formats(ctx, _, value):
ctx.exit()


def warn_if_hash_collision(soft1: Optional[Software], soft2: Optional[Software]):
if not soft1 or not soft2:
return
# A hash collision occurs if one or more but less than all hashes match or
# any hash matches but the filesize is different
collision = False
if soft1.sha256 == soft2.sha256 or soft1.sha1 == soft2.sha1 or soft1.md5 == soft2.md5:
# Hashes can be None; make sure they aren't before checking for inequality
if soft1.sha256 and soft2.sha256 and soft1.sha256 != soft2.sha256:
collision = True
elif soft1.sha1 and soft2.sha1 and soft1.sha1 != soft2.sha1:
collision = True
elif soft1.md5 and soft2.md5 and soft1.md5 != soft2.md5:
collision = True
elif soft1.size != soft2.size:
collision = True
if collision:
logger.warning(
f"Hash collision between {soft1.name} and {soft2.name}; unexpected results may occur"
)
def determine_install_prefix(
entry: Optional[ContextEntry] = None,
extract_path: Optional[Union[str, pathlib.Path]] = None,
skip_extract_path: bool = False,
) -> Optional[str]:
"""Determine the install prefix based on what is provided in the context entry, and the extract path for the file.
Args:
entry (Optional[ContextEntry]): The context entry to check for an install prefix.
extract_path (Optional[str|pathlib.Path]): The extract path for the file to use as a potential fallback.
skip_extract_path (bool): Whether the extract_path should be skipped if the entry does not specify an installPrefix.
Returns:
Optional[str]: The install prefix to use, or 'NoneType' if an install path shouldn't be listed.
"""
install_prefix = None
if entry.installPrefix or entry.installPrefix == "":
install_prefix = entry.installPrefix
elif not skip_extract_path:
# pathlib doesn't include the trailing slash
epath = pathlib.Path(extract_path)
if epath.is_file():
install_prefix = epath.parent.as_posix() if len(epath.parts) > 1 else ""
else:
install_prefix = epath.as_posix()
# add a trailing slash after last directory name
if install_prefix != "" and not install_prefix.endswith("/"):
install_prefix += "/"
return install_prefix


def get_default_from_config(option: str, fallback: Optional[Any] = None) -> Any:
Expand Down Expand Up @@ -264,9 +280,7 @@ def sbom(
else:
# Emulate a configuration file with the path
config = []
config.append({})
config[0]["extractPaths"] = [config_file]
config[0]["installPrefix"] = config_file
config.append({"extractPaths": [config_file], "installPrefix": config_file})

# quit if invalid path found
if not validate_config(config):
Expand Down Expand Up @@ -305,7 +319,10 @@ def sbom(
user_institution_name=recorded_institution,
)
archive_entry = new_sbom.find_software(parent_entry.sha256)
warn_if_hash_collision(archive_entry, parent_entry)
if Software.check_for_hash_collision(archive_entry, parent_entry):
logger.warning(
f"Hash collision between {archive_entry.name} and {parent_entry.name}; unexpected results may occur"
)
if archive_entry:
parent_entry = archive_entry
else:
Expand All @@ -332,24 +349,55 @@ def sbom(
entry.installPrefix = entry.installPrefix.replace("\\", "\\\\")

for epath in entry.extractPaths:
# extractPath should not end with "/" (Windows-style backslash paths shouldn't be used at all)
if epath.endswith("/"):
epath = epath[:-1]
logger.trace("Extracted Path: " + str(epath))
# convert to pathlib.Path, ensures trailing "/" won't be present and some more consistent path formatting
epath = pathlib.Path(epath)
install_prefix = determine_install_prefix(
entry, epath, skip_extract_path=skip_install_path
)
logger.trace("Extracted Path: " + epath.as_posix())

# handle individual file case, since os.walk doesn't
if epath.is_file():
entries: List[Software] = []
filepath = epath.as_posix()
# breakpoint()
try:
sw_parent, sw_children = get_software_entry(
context,
pm,
new_sbom,
filepath,
filetype=pm.hook.identify_file_type(filepath=filepath),
root_path=epath.parent.as_posix() if len(epath.parts) > 1 else "",
container_uuid=parent_uuid,
install_path=install_prefix,
user_institution_name=recorded_institution,
)
except Exception as e:
raise RuntimeError(f"Unable to process: {filepath}") from e
entries.append(sw_parent)
entries.extend(sw_children if sw_children else [])
new_sbom.add_software_entries(entries, parent_entry=parent_entry)
# epath was a file, no need to walk the directory tree
continue

# epath is a directory, walk it
for cdir, dirs, files in os.walk(epath):
logger.info("Processing " + str(cdir))

if entry.installPrefix:
for dir_ in dirs:
full_path = os.path.join(cdir, dir_)
if os.path.islink(full_path):
dest = resolve_link(full_path, cdir, epath, entry.installPrefix)
dest = resolve_link(
full_path, cdir, epath.as_posix(), entry.installPrefix
)
if dest is not None:
install_source = real_path_to_install_path(
epath, entry.installPrefix, full_path
epath.as_posix(), entry.installPrefix, full_path
)
install_dest = real_path_to_install_path(
epath, entry.installPrefix, dest
epath.as_posix(), entry.installPrefix, dest
)
dir_symlinks.append((install_source, install_dest))

Expand All @@ -362,7 +410,9 @@ def sbom(
# Record symlink details but don't run info extractors on them
if os.path.islink(filepath):
# NOTE: resolve_link function could print warning if symlink goes outside of extract path dir
true_filepath = resolve_link(filepath, cdir, epath, entry.installPrefix)
true_filepath = resolve_link(
filepath, cdir, epath.as_posix(), entry.installPrefix
)
# Dead/infinite links will error so skip them
if true_filepath is None:
continue
Expand All @@ -386,10 +436,10 @@ def sbom(
# Record symlink install path if an install prefix is given
if entry.installPrefix:
install_filepath = real_path_to_install_path(
epath, entry.installPrefix, filepath
epath.as_posix(), entry.installPrefix, filepath
)
install_dest = real_path_to_install_path(
epath, entry.installPrefix, true_filepath
epath.as_posix(), entry.installPrefix, true_filepath
)
# A dead link shows as a file so need to test if it's a
# file or a directory once rebased
Expand All @@ -407,88 +457,31 @@ def sbom(
# unpacking/installation?
continue

if entry.installPrefix or entry.installPrefix == "":
install_path = entry.installPrefix
elif not skip_install_path:
# epath is guaranteed to not have an ending slash due to formatting above
install_path = epath + "/"
else:
install_path = None
if os.path.isfile(filepath):
if ftype := pm.hook.identify_file_type(filepath=filepath):
if (
ftype := pm.hook.identify_file_type(filepath=filepath)
or include_all_files
):
try:
sw_parent, sw_children = get_software_entry(
context,
pm,
new_sbom,
filepath,
filetype=ftype,
root_path=epath,
root_path=epath.as_posix(),
container_uuid=parent_uuid,
install_path=install_path,
install_path=install_prefix,
user_institution_name=recorded_institution,
include_all_files=include_all_files
or entry.includeAllFiles,
)
except Exception as e:
raise RuntimeError(f"Unable to process: {filepath}") from e

if (
ftype := pm.hook.identify_file_type(filepath=filepath)
or include_all_files
):
try:
sw_parent, sw_children = get_software_entry(
context,
pm,
new_sbom,
filepath,
filetype=ftype,
root_path=epath,
container_uuid=parent_uuid,
install_path=install_path,
user_institution_name=recorded_institution,
include_all_files=include_all_files or entry.includeAllFiles,
)
except Exception as e:
raise RuntimeError(f"Unable to process: {filepath}") from e

entries.append(sw_parent)
for sw in sw_children:
entries.append(sw)

if entries:
# if a software entry already exists with a matching file hash, augment the info in the existing entry
for e in entries:
existing_sw = new_sbom.find_software(e.sha256)
warn_if_hash_collision(existing_sw, e)
if not existing_sw:
new_sbom.add_software(e)
# if the config file specified a parent/container for the file, add the new entry as a "Contains" relationship
if parent_entry:
parent_uuid = parent_entry.UUID
child_uuid = e.UUID
new_sbom.create_relationship(
parent_uuid, child_uuid, "Contains"
)
else:
existing_uuid, entry_uuid = existing_sw.merge(e)
# go through relationships and see if any need existing entries updated for the replaced uuid (e.g. merging SBOMs)
for rel in new_sbom.relationships:
if rel.xUUID == entry_uuid:
rel.xUUID = existing_uuid
if rel.yUUID == entry_uuid:
rel.yUUID = existing_uuid
# add a new contains relationship if the duplicate file is from a different container/archive than previous times seeing the file
if parent_entry:
parent_uuid = parent_entry.UUID
child_uuid = existing_uuid
# avoid duplicate entries
if not new_sbom.find_relationship(
parent_uuid, child_uuid, "Contains"
):
new_sbom.create_relationship(
parent_uuid, child_uuid, "Contains"
)
# TODO a pass later on to check for and remove duplicate relationships should be added just in case
entries.append(sw_parent)
entries.extend(sw_children if sw_children else [])
new_sbom.add_software_entries(entries, parent_entry=parent_entry)

# Add symlinks to install paths and file names
for software in new_sbom.software:
Expand Down
3 changes: 1 addition & 2 deletions surfactant/infoextractors/mach_o_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@


def supports_file(filetype) -> bool:
# TODO: It could be decided whether to keep it this way or separate into cases
return "MACHO" in filetype # Covers MACHOFAT, MACHOFAT64, MACHO32, MACHO64
return filetype in ("MACHOFAT", "MACHOFAT64", "MACHO32", "MACHO64")


@surfactant.plugin.hookimpl
Expand Down
39 changes: 39 additions & 0 deletions surfactant/sbomtypes/_sbom.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,45 @@ def add_software(self, sw: Software) -> None:
self.software_lookup_by_sha256[sw.sha256] = sw
self.software.append(sw)

def add_software_entries(
self, entries: Optional[List[Software]], parent_entry: Optional[Software] = None
):
"""Add software entries to the SBOM, merging into existing entries as needed.
Args:
entries (Optional[List[Software]]): A list of Software entries to add to the SBOM.
parent_entry (Optional[Software]): An optional parent software entry to add "Contains" relationships to.
"""
if not entries:
return
# if a software entry already exists with a matching file hash, augment the info in the existing entry
for e in entries:
existing_sw = self.find_software(e.sha256)
if Software.check_for_hash_collision(existing_sw, e):
logger.warning(
f"Hash collision between {existing_sw.name} and {e.name}; unexpected results may occur"
)
if not existing_sw:
self.add_software(e)
else:
existing_uuid, entry_uuid = existing_sw.merge(e)
# go through relationships and see if any need existing entries updated for the replaced uuid (e.g. merging SBOMs)
for rel in self.relationships:
if rel.xUUID == entry_uuid:
rel.xUUID = existing_uuid
if rel.yUUID == entry_uuid:
rel.yUUID = existing_uuid
# if a parent/container was specified for the file, add the new entry as a "Contains" relationship
if parent_entry:
parent_uuid = parent_entry.UUID
child_uuid = existing_uuid if existing_sw else e.UUID
# avoid duplicate relationships if the software entry already existed
if not existing_sw or not self.find_relationship(
parent_uuid, child_uuid, "Contains"
):
self.create_relationship(parent_uuid, child_uuid, "Contains")
# TODO a pass later on to check for and remove duplicate relationships should be added just in case

# pylint: disable=too-many-arguments
def create_software(
self,
Expand Down
19 changes: 19 additions & 0 deletions surfactant/sbomtypes/_software.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,22 @@ def merge(self, sw: Software):
current_arr.append(new_value)

return self.UUID, sw.UUID

@staticmethod
def check_for_hash_collision(soft1: Optional[Software], soft2: Optional[Software]) -> bool:
if not soft1 or not soft2:
return False
# A hash collision occurs if one or more but less than all hashes match or
# any hash matches but the filesize is different
collision = False
if soft1.sha256 == soft2.sha256 or soft1.sha1 == soft2.sha1 or soft1.md5 == soft2.md5:
# Hashes can be None; make sure they aren't before checking for inequality
if soft1.sha256 and soft2.sha256 and soft1.sha256 != soft2.sha256:
collision = True
elif soft1.sha1 and soft2.sha1 and soft1.sha1 != soft2.sha1:
collision = True
elif soft1.md5 and soft2.md5 and soft1.md5 != soft2.md5:
collision = True
elif soft1.size != soft2.size:
collision = True
return collision

0 comments on commit a795327

Please sign in to comment.