Skip to content

Commit

Permalink
#Centipede Refactor distill.cc + Move shard writing into concurrent t…
Browse files Browse the repository at this point in the history
…hreads

PiperOrigin-RevId: 604398718
  • Loading branch information
ussuri authored and copybara-github committed Feb 5, 2024
1 parent 390b5aa commit 18cc5a7
Show file tree
Hide file tree
Showing 2 changed files with 201 additions and 72 deletions.
2 changes: 2 additions & 0 deletions centipede/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -917,9 +917,11 @@ cc_library(
":thread_pool",
":util",
":workdir",
"@com_google_absl//absl/base:core_headers",
"@com_google_absl//absl/log",
"@com_google_absl//absl/log:check",
"@com_google_absl//absl/strings",
"@com_google_absl//absl/synchronization",
"@com_google_absl//absl/time",
],
)
Expand Down
271 changes: 199 additions & 72 deletions centipede/distill.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,24 @@
#include "./centipede/distill.h"

#include <algorithm>
#include <cstddef>
#include <cstdlib>
#include <functional>
#include <memory>
#include <numeric>
#include <optional>
#include <sstream>
#include <string>
#include <string_view>
#include <thread> // NOLINT(build/c++11)
#include <utility>
#include <vector>

#include "absl/base/thread_annotations.h"
#include "absl/log/check.h"
#include "absl/log/log.h"
#include "absl/strings/str_cat.h"
#include "absl/synchronization/mutex.h"
#include "absl/time/time.h"
#include "./centipede/blob_file.h"
#include "./centipede/defs.h"
Expand All @@ -41,100 +48,220 @@

namespace centipede {

using CorpusElt = std::pair<ByteArray, FeatureVec>;
using CorpusEltVec = std::vector<CorpusElt>;

namespace {

struct CorpusElt {
ByteArray input;
FeatureVec features;

CorpusElt(const ByteArray &input, FeatureVec features)
: input(input), features(std::move(features)) {}

// Movable, but not copyable for efficiency.
CorpusElt(const CorpusElt &) = delete;
CorpusElt &operator=(const CorpusElt &) = delete;
CorpusElt(CorpusElt &&) = default;
CorpusElt &operator=(CorpusElt &&) = default;

ByteArray PackedFeatures() const {
return PackFeaturesAndHash(input, features);
}
};

using CorpusEltVec = std::vector<CorpusElt>;

// The maximum number of threads reading input shards concurrently. This is
// mainly to prevent I/O congestion.
// TODO(ussuri): Bump up significantly when RSS-gated mutexing is in.
inline constexpr size_t kMaxReadingThreads = 1;

} // namespace
std::string LogPrefix(const Environment &env) {
return absl::StrCat("DISTILL[S.", env.my_shard_index, "]: ");
}

void DistillTask(const Environment &env,
const std::vector<size_t> &shard_indices) {
const std::string log_line =
absl::StrCat("DISTILL[S.", env.my_shard_index, "]: ");
// TODO(ussuri): Move the reader/writer classes to shard_reader.cc, rename it
// to corpus_io.cc, and reuse the new APIs where useful in the code base.

const WorkDir wd{env};
const auto corpus_path = wd.DistilledCorpusFiles().MyShardPath();
const auto features_path = wd.DistilledFeaturesFiles().MyShardPath();
LOG(INFO) << log_line << VV(env.total_shards) << VV(corpus_path)
// A helper class for reading input corpus shards. Thread-safe.
class InputCorpusShardReader {
public:
InputCorpusShardReader(const Environment &env)
: workdir_{env}, log_prefix_{LogPrefix(env)} {}

// Reads and returns a single shard's elements. Thread-safe.
CorpusEltVec ReadShard(size_t shard_idx) {
const auto corpus_path = workdir_.CorpusFiles().ShardPath(shard_idx);
const auto features_path = workdir_.FeaturesFiles().ShardPath(shard_idx);
VLOG(1) << log_prefix_ << "reading input shard " << shard_idx << ":\n"
<< VV(corpus_path) << "\n"
<< VV(features_path);
CorpusEltVec elts;
// Read elements from the current shard.
centipede::ReadShard( //
corpus_path, features_path,
[&elts](const ByteArray &input, FeatureVec &features) {
elts.emplace_back(input, std::move(features));
});
return elts;
}

const auto corpus_writer = DefaultBlobFileWriterFactory(env.riegeli);
const auto features_writer = DefaultBlobFileWriterFactory(env.riegeli);
// NOTE: Overwrite distilled corpus and features files -- do not append.
CHECK_OK(corpus_writer->Open(corpus_path, "w"));
CHECK_OK(features_writer->Open(features_path, "w"));
private:
const WorkDir workdir_;
const std::string log_prefix_;
};

const size_t num_shards = shard_indices.size();
size_t num_read_shards = 0;
size_t num_read_elements = 0;
size_t num_distilled_elements = 0;
const auto corpus_files = wd.CorpusFiles();
const auto features_files = wd.FeaturesFiles();
// A helper class for writing corpus shards. Thread-safe.
class CorpusShardWriter {
public:
// The writing stats so far.
struct Stats {
size_t num_total_elts = 0;
size_t num_written_elts = 0;
size_t num_written_batches = 0;
};

std::vector<CorpusEltVec> elts_per_shard(num_shards);
FeatureSet feature_set(/*frequency_threshold=*/1,
env.MakeDomainDiscardMask());
CorpusShardWriter(const Environment &env, bool append)
: workdir_{env},
log_prefix_{LogPrefix(env)},
corpus_path_{workdir_.DistilledCorpusFiles().MyShardPath()},
features_path_{workdir_.DistilledFeaturesFiles().MyShardPath()},
corpus_writer_{DefaultBlobFileWriterFactory()},
feature_writer_{DefaultBlobFileWriterFactory()} {
CHECK_OK(corpus_writer_->Open(corpus_path_, append ? "a" : "w"));
CHECK_OK(feature_writer_->Open(features_path_, append ? "a" : "w"));
}

// Read the shards in parallel.
{
ThreadPool threads{std::min<int>(kMaxReadingThreads, num_shards)};
virtual ~CorpusShardWriter() = default;

void WriteElt(CorpusElt elt) {
absl::MutexLock lock(&mu_);
WriteEltImpl(std::move(elt));
}

void WriteBatch(CorpusEltVec elts) {
absl::MutexLock lock(&mu_);
VLOG(1) << log_prefix_ << "writing " << elts.size()
<< " elements to output shard:\n"
<< VV(corpus_path_) << "\n"
<< VV(features_path_);
for (auto &elt : elts) {
WriteEltImpl(std::move(elt));
}
++stats_.num_written_batches;
}

Stats GetStats() const {
absl::MutexLock lock(&mu_);
return stats_;
}

protected:
// A behavior customization point: a derived class gets an opportunity to
// analyze and/or preprocess `elt` before it is written. For example, a
// derived class can trim the element's feature set before it is written, or
// choose to skip writing it entirely by returning `std::nullopt`.
virtual std::optional<CorpusElt> PreprocessElt(CorpusElt elt) {
return std::move(elt);
}

private:
void WriteEltImpl(CorpusElt elt) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
++stats_.num_total_elts;
const auto preprocessed_elt = PreprocessElt(std::move(elt));
if (preprocessed_elt.has_value()) {
// Append to the distilled corpus and features files.
CHECK_OK(corpus_writer_->Write(preprocessed_elt->input));
CHECK_OK(feature_writer_->Write(preprocessed_elt->PackedFeatures()));
++stats_.num_written_elts;
}
}

// Const state.
const WorkDir workdir_;
const std::string log_prefix_;
const std::string corpus_path_;
const std::string features_path_;

// Mutable state.
mutable absl::Mutex mu_;
std::unique_ptr<BlobFileWriter> corpus_writer_ ABSL_GUARDED_BY(mu_);
std::unique_ptr<BlobFileWriter> feature_writer_ ABSL_GUARDED_BY(mu_);
Stats stats_ ABSL_GUARDED_BY(mu_);
};

// A helper class for writing distilled corpus shards. NOT thread-safe because
// all writes go to a single file.
class DistilledCorpusShardWriter : public CorpusShardWriter {
public:
// An extension to the parent class's `Stats`.
struct DistilledStats {
// The accumulated features of the distilled corpus so far, represents in
// the same compact textual form that Centipede uses in its fuzzing progress
// log messages, e.g.: "ft: 96331 cov: 81793 usr1: 5045 ...".
std::string coverage_str;
};

DistilledCorpusShardWriter(const Environment &env, bool append)
: CorpusShardWriter{env, append},
feature_set_{/*frequency_threshold=*/1, env.MakeDomainDiscardMask()} {}

~DistilledCorpusShardWriter() override = default;

DistilledStats GetDistilledStats() const {
absl::MutexLock lock(&mu_);
DistilledStats stats;
std::stringstream coverage_ss;
coverage_ss << feature_set_;
stats.coverage_str = coverage_ss.str();
return stats;
}

protected:
std::optional<CorpusElt> PreprocessElt(CorpusElt elt) override {
absl::MutexLock lock(&mu_);
feature_set_.PruneDiscardedDomains(elt.features);
if (!feature_set_.HasUnseenFeatures(elt.features)) return std::nullopt;
feature_set_.IncrementFrequencies(elt.features);
return std::move(elt);
}

private:
mutable absl::Mutex mu_;
FeatureSet feature_set_ ABSL_GUARDED_BY(mu_);
};

} // namespace

void DistillTask(const Environment &env,
const std::vector<size_t> &shard_indices) {
// Read and write the shards in parallel, but gate reading of each on the
// availability of free RAM to keep the peak RAM usage under control.
const size_t num_shards = shard_indices.size();
InputCorpusShardReader reader{env};
// NOTE: Always overwrite corpus and features files, never append.
DistilledCorpusShardWriter writer{env, /*append=*/false};

{
ThreadPool threads{kMaxReadingThreads};
for (size_t shard_idx : shard_indices) {
CHECK_LT(shard_idx, num_shards);
threads.Schedule([corpus_path = corpus_files.ShardPath(shard_idx),
features_path = features_files.ShardPath(shard_idx),
&shard_elts = elts_per_shard[shard_idx], shard_idx,
&log_line] {
VLOG(2) << log_line << "reading shard " << shard_idx << " from:\n"
<< VV(corpus_path) << "\n"
<< VV(features_path);
// Read elements from the current shard.
ReadShard(corpus_path, features_path,
[&shard_elts](const ByteArray &input, FeatureVec &features) {
shard_elts.emplace_back(input, std::move(features));
});
// Reverse the order of inputs read from the current shard.
// The intuition is as follows:
threads.Schedule([shard_idx, &reader, &writer, &env, num_shards] {
CorpusEltVec shard_elts = reader.ReadShard(shard_idx);
// Reverse the order of elements. The intuition is as follows:
// * If the shard is the result of fuzzing with Centipede, the inputs
// that are closer to the end are more interesting, so we start there.
// that are closer to the end are more interesting, so we start there.
// * If the shard resulted from somethening else, the reverse order is
// not any better or worse than any other order.
// not any better or worse than any other order.
std::reverse(shard_elts.begin(), shard_elts.end());
writer.WriteBatch(std::move(shard_elts));
const auto stats = writer.GetStats();
const auto distilled_stats = writer.GetDistilledStats();
LOG(INFO) << LogPrefix(env) << distilled_stats.coverage_str
<< " batches: " << stats.num_written_batches << "/"
<< num_shards << " inputs: " << stats.num_total_elts
<< " distilled: " << stats.num_written_elts;
});
}
} // The reading threads join here.

for (size_t shard_idx : shard_indices) {
// Iterate the elts, add those that have new features.
// This is a simple linear greedy set cover algorithm.
auto &shard_elts = elts_per_shard[shard_idx];
VLOG(1) << log_line << "appending elements from input shard " << shard_idx
<< " to output shard";
for (auto &[input, features] : shard_elts) {
++num_read_elements;
feature_set.PruneDiscardedDomains(features);
if (!feature_set.HasUnseenFeatures(features)) continue;
feature_set.IncrementFrequencies(features);
// Append to the distilled corpus and features files.
CHECK_OK(corpus_writer->Write(input));
CHECK_OK(features_writer->Write(PackFeaturesAndHash(input, features)));
input.clear();
features.clear();
++num_distilled_elements;
VLOG_EVERY_N(10, 1000) << VV(num_distilled_elements);
}
shard_elts.clear();
++num_read_shards;
LOG(INFO) << log_line << feature_set << " src_shards: " << num_read_shards
<< "/" << num_shards << " src_elts: " << num_read_elements
<< " dist_elts: " << num_distilled_elements;
}
}

int Distill(const Environment &env) {
Expand Down

0 comments on commit 18cc5a7

Please sign in to comment.