Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#Centipede Refactor distill.cc + Move shard writing into concurrent threads #946

Merged
merged 1 commit into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions centipede/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -917,9 +917,11 @@ cc_library(
":thread_pool",
":util",
":workdir",
"@com_google_absl//absl/base:core_headers",
"@com_google_absl//absl/log",
"@com_google_absl//absl/log:check",
"@com_google_absl//absl/strings",
"@com_google_absl//absl/synchronization",
"@com_google_absl//absl/time",
],
)
Expand Down
271 changes: 199 additions & 72 deletions centipede/distill.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,24 @@
#include "./centipede/distill.h"

#include <algorithm>
#include <cstddef>
#include <cstdlib>
#include <functional>
#include <memory>
#include <numeric>
#include <optional>
#include <sstream>
#include <string>
#include <string_view>
#include <thread> // NOLINT(build/c++11)
#include <utility>
#include <vector>

#include "absl/base/thread_annotations.h"
#include "absl/log/check.h"
#include "absl/log/log.h"
#include "absl/strings/str_cat.h"
#include "absl/synchronization/mutex.h"
#include "absl/time/time.h"
#include "./centipede/blob_file.h"
#include "./centipede/defs.h"
Expand All @@ -41,100 +48,220 @@

namespace centipede {

using CorpusElt = std::pair<ByteArray, FeatureVec>;
using CorpusEltVec = std::vector<CorpusElt>;

namespace {

struct CorpusElt {
ByteArray input;
FeatureVec features;

CorpusElt(const ByteArray &input, FeatureVec features)
: input(input), features(std::move(features)) {}

// Movable, but not copyable for efficiency.
CorpusElt(const CorpusElt &) = delete;
CorpusElt &operator=(const CorpusElt &) = delete;
CorpusElt(CorpusElt &&) = default;
CorpusElt &operator=(CorpusElt &&) = default;

ByteArray PackedFeatures() const {
return PackFeaturesAndHash(input, features);
}
};

using CorpusEltVec = std::vector<CorpusElt>;

// The maximum number of threads reading input shards concurrently. This is
// mainly to prevent I/O congestion.
// TODO(ussuri): Bump up significantly when RSS-gated mutexing is in.
inline constexpr size_t kMaxReadingThreads = 1;

} // namespace
std::string LogPrefix(const Environment &env) {
return absl::StrCat("DISTILL[S.", env.my_shard_index, "]: ");
}

void DistillTask(const Environment &env,
const std::vector<size_t> &shard_indices) {
const std::string log_line =
absl::StrCat("DISTILL[S.", env.my_shard_index, "]: ");
// TODO(ussuri): Move the reader/writer classes to shard_reader.cc, rename it
// to corpus_io.cc, and reuse the new APIs where useful in the code base.

const WorkDir wd{env};
const auto corpus_path = wd.DistilledCorpusFiles().MyShardPath();
const auto features_path = wd.DistilledFeaturesFiles().MyShardPath();
LOG(INFO) << log_line << VV(env.total_shards) << VV(corpus_path)
// A helper class for reading input corpus shards. Thread-safe.
class InputCorpusShardReader {
public:
InputCorpusShardReader(const Environment &env)
: workdir_{env}, log_prefix_{LogPrefix(env)} {}

// Reads and returns a single shard's elements. Thread-safe.
CorpusEltVec ReadShard(size_t shard_idx) {
const auto corpus_path = workdir_.CorpusFiles().ShardPath(shard_idx);
const auto features_path = workdir_.FeaturesFiles().ShardPath(shard_idx);
VLOG(1) << log_prefix_ << "reading input shard " << shard_idx << ":\n"
<< VV(corpus_path) << "\n"
<< VV(features_path);
CorpusEltVec elts;
// Read elements from the current shard.
centipede::ReadShard( //
corpus_path, features_path,
[&elts](const ByteArray &input, FeatureVec &features) {
elts.emplace_back(input, std::move(features));
});
return elts;
}

const auto corpus_writer = DefaultBlobFileWriterFactory(env.riegeli);
const auto features_writer = DefaultBlobFileWriterFactory(env.riegeli);
// NOTE: Overwrite distilled corpus and features files -- do not append.
CHECK_OK(corpus_writer->Open(corpus_path, "w"));
CHECK_OK(features_writer->Open(features_path, "w"));
private:
const WorkDir workdir_;
const std::string log_prefix_;
};

const size_t num_shards = shard_indices.size();
size_t num_read_shards = 0;
size_t num_read_elements = 0;
size_t num_distilled_elements = 0;
const auto corpus_files = wd.CorpusFiles();
const auto features_files = wd.FeaturesFiles();
// A helper class for writing corpus shards. Thread-safe.
class CorpusShardWriter {
public:
// The writing stats so far.
struct Stats {
size_t num_total_elts = 0;
size_t num_written_elts = 0;
size_t num_written_batches = 0;
};

std::vector<CorpusEltVec> elts_per_shard(num_shards);
FeatureSet feature_set(/*frequency_threshold=*/1,
env.MakeDomainDiscardMask());
CorpusShardWriter(const Environment &env, bool append)
: workdir_{env},
log_prefix_{LogPrefix(env)},
corpus_path_{workdir_.DistilledCorpusFiles().MyShardPath()},
features_path_{workdir_.DistilledFeaturesFiles().MyShardPath()},
corpus_writer_{DefaultBlobFileWriterFactory()},
feature_writer_{DefaultBlobFileWriterFactory()} {
CHECK_OK(corpus_writer_->Open(corpus_path_, append ? "a" : "w"));
CHECK_OK(feature_writer_->Open(features_path_, append ? "a" : "w"));
}

// Read the shards in parallel.
{
ThreadPool threads{std::min<int>(kMaxReadingThreads, num_shards)};
virtual ~CorpusShardWriter() = default;

void WriteElt(CorpusElt elt) {
absl::MutexLock lock(&mu_);
WriteEltImpl(std::move(elt));
}

void WriteBatch(CorpusEltVec elts) {
absl::MutexLock lock(&mu_);
VLOG(1) << log_prefix_ << "writing " << elts.size()
<< " elements to output shard:\n"
<< VV(corpus_path_) << "\n"
<< VV(features_path_);
for (auto &elt : elts) {
WriteEltImpl(std::move(elt));
}
++stats_.num_written_batches;
}

Stats GetStats() const {
absl::MutexLock lock(&mu_);
return stats_;
}

protected:
// A behavior customization point: a derived class gets an opportunity to
// analyze and/or preprocess `elt` before it is written. For example, a
// derived class can trim the element's feature set before it is written, or
// choose to skip writing it entirely by returning `std::nullopt`.
virtual std::optional<CorpusElt> PreprocessElt(CorpusElt elt) {
return std::move(elt);
}

private:
void WriteEltImpl(CorpusElt elt) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
++stats_.num_total_elts;
const auto preprocessed_elt = PreprocessElt(std::move(elt));
if (preprocessed_elt.has_value()) {
// Append to the distilled corpus and features files.
CHECK_OK(corpus_writer_->Write(preprocessed_elt->input));
CHECK_OK(feature_writer_->Write(preprocessed_elt->PackedFeatures()));
++stats_.num_written_elts;
}
}

// Const state.
const WorkDir workdir_;
const std::string log_prefix_;
const std::string corpus_path_;
const std::string features_path_;

// Mutable state.
mutable absl::Mutex mu_;
std::unique_ptr<BlobFileWriter> corpus_writer_ ABSL_GUARDED_BY(mu_);
std::unique_ptr<BlobFileWriter> feature_writer_ ABSL_GUARDED_BY(mu_);
Stats stats_ ABSL_GUARDED_BY(mu_);
};

// A helper class for writing distilled corpus shards. NOT thread-safe because
// all writes go to a single file.
class DistilledCorpusShardWriter : public CorpusShardWriter {
public:
// An extension to the parent class's `Stats`.
struct DistilledStats {
// The accumulated features of the distilled corpus so far, represents in
// the same compact textual form that Centipede uses in its fuzzing progress
// log messages, e.g.: "ft: 96331 cov: 81793 usr1: 5045 ...".
std::string coverage_str;
};

DistilledCorpusShardWriter(const Environment &env, bool append)
: CorpusShardWriter{env, append},
feature_set_{/*frequency_threshold=*/1, env.MakeDomainDiscardMask()} {}

~DistilledCorpusShardWriter() override = default;

DistilledStats GetDistilledStats() const {
absl::MutexLock lock(&mu_);
DistilledStats stats;
std::stringstream coverage_ss;
coverage_ss << feature_set_;
stats.coverage_str = coverage_ss.str();
return stats;
}

protected:
std::optional<CorpusElt> PreprocessElt(CorpusElt elt) override {
absl::MutexLock lock(&mu_);
feature_set_.PruneDiscardedDomains(elt.features);
if (!feature_set_.HasUnseenFeatures(elt.features)) return std::nullopt;
feature_set_.IncrementFrequencies(elt.features);
return std::move(elt);
}

private:
mutable absl::Mutex mu_;
FeatureSet feature_set_ ABSL_GUARDED_BY(mu_);
};

} // namespace

void DistillTask(const Environment &env,
const std::vector<size_t> &shard_indices) {
// Read and write the shards in parallel, but gate reading of each on the
// availability of free RAM to keep the peak RAM usage under control.
const size_t num_shards = shard_indices.size();
InputCorpusShardReader reader{env};
// NOTE: Always overwrite corpus and features files, never append.
DistilledCorpusShardWriter writer{env, /*append=*/false};

{
ThreadPool threads{kMaxReadingThreads};
for (size_t shard_idx : shard_indices) {
CHECK_LT(shard_idx, num_shards);
threads.Schedule([corpus_path = corpus_files.ShardPath(shard_idx),
features_path = features_files.ShardPath(shard_idx),
&shard_elts = elts_per_shard[shard_idx], shard_idx,
&log_line] {
VLOG(2) << log_line << "reading shard " << shard_idx << " from:\n"
<< VV(corpus_path) << "\n"
<< VV(features_path);
// Read elements from the current shard.
ReadShard(corpus_path, features_path,
[&shard_elts](const ByteArray &input, FeatureVec &features) {
shard_elts.emplace_back(input, std::move(features));
});
// Reverse the order of inputs read from the current shard.
// The intuition is as follows:
threads.Schedule([shard_idx, &reader, &writer, &env, num_shards] {
CorpusEltVec shard_elts = reader.ReadShard(shard_idx);
// Reverse the order of elements. The intuition is as follows:
// * If the shard is the result of fuzzing with Centipede, the inputs
// that are closer to the end are more interesting, so we start there.
// that are closer to the end are more interesting, so we start there.
// * If the shard resulted from somethening else, the reverse order is
// not any better or worse than any other order.
// not any better or worse than any other order.
std::reverse(shard_elts.begin(), shard_elts.end());
writer.WriteBatch(std::move(shard_elts));
const auto stats = writer.GetStats();
const auto distilled_stats = writer.GetDistilledStats();
LOG(INFO) << LogPrefix(env) << distilled_stats.coverage_str
<< " batches: " << stats.num_written_batches << "/"
<< num_shards << " inputs: " << stats.num_total_elts
<< " distilled: " << stats.num_written_elts;
});
}
} // The reading threads join here.

for (size_t shard_idx : shard_indices) {
// Iterate the elts, add those that have new features.
// This is a simple linear greedy set cover algorithm.
auto &shard_elts = elts_per_shard[shard_idx];
VLOG(1) << log_line << "appending elements from input shard " << shard_idx
<< " to output shard";
for (auto &[input, features] : shard_elts) {
++num_read_elements;
feature_set.PruneDiscardedDomains(features);
if (!feature_set.HasUnseenFeatures(features)) continue;
feature_set.IncrementFrequencies(features);
// Append to the distilled corpus and features files.
CHECK_OK(corpus_writer->Write(input));
CHECK_OK(features_writer->Write(PackFeaturesAndHash(input, features)));
input.clear();
features.clear();
++num_distilled_elements;
VLOG_EVERY_N(10, 1000) << VV(num_distilled_elements);
}
shard_elts.clear();
++num_read_shards;
LOG(INFO) << log_line << feature_set << " src_shards: " << num_read_shards
<< "/" << num_shards << " src_elts: " << num_read_elements
<< " dist_elts: " << num_distilled_elements;
}
}

int Distill(const Environment &env) {
Expand Down
Loading