From d76ec1cecbf7453fa5c76b7b1da919b82de02c10 Mon Sep 17 00:00:00 2001 From: Martin Dobias Date: Wed, 16 Dec 2020 14:25:21 +0100 Subject: [PATCH] Update untwine to match the current hobu/untwine main branch Added a minor update to the untwine_to_qgis.bash script as it was getting confused about the "untwine" subfolders and copying sources one level deeper as supposed to --- external/untwine/bu/Processor.cpp | 11 ++++++- external/untwine/epf/BufferCache.cpp | 13 +++----- external/untwine/epf/BufferCache.hpp | 3 +- external/untwine/epf/Cell.cpp | 41 ++++++++++++++++++++------ external/untwine/epf/Cell.hpp | 23 +++++++++++---- external/untwine/epf/Epf.cpp | 6 ++-- external/untwine/epf/EpfTypes.hpp | 2 ++ external/untwine/epf/FileProcessor.cpp | 15 ++++++---- external/untwine/epf/Reprocessor.cpp | 1 - external/untwine/epf/Writer.cpp | 20 ++++++++++++- external/untwine/epf/Writer.hpp | 4 +-- external/untwine_to_qgis.bash | 10 +++---- 12 files changed, 105 insertions(+), 44 deletions(-) diff --git a/external/untwine/bu/Processor.cpp b/external/untwine/bu/Processor.cpp index e6a80ba635b..d02aec495ef 100644 --- a/external/untwine/bu/Processor.cpp +++ b/external/untwine/bu/Processor.cpp @@ -307,12 +307,21 @@ Processor::writeOctantCompressed(const OctantInfo& o, Index& index, IndexIter po } } flush: - flushCompressed(table, view, o); + try + { + flushCompressed(table, view, o); + } + catch (pdal::pdal_error& err) + { + fatal(err.what()); + } + m_manager.logOctant(o.key(), count); return pos; } +// Copy data from the source file to the point view. void Processor::appendCompressed(pdal::PointViewPtr view, const DimInfoList& dims, const FileInfo& fi, IndexIter begin, IndexIter end) { diff --git a/external/untwine/epf/BufferCache.cpp b/external/untwine/epf/BufferCache.cpp index 5ab5c6dbc0b..18a0446958f 100644 --- a/external/untwine/epf/BufferCache.cpp +++ b/external/untwine/epf/BufferCache.cpp @@ -21,9 +21,11 @@ namespace epf { // If we have a buffer in the cache, return it. Otherwise create a new one and return that. -DataVecPtr BufferCache::fetch() +// If nonblock is true and there are no available buffers, return null. +DataVecPtr BufferCache::fetch(std::unique_lock& lock, bool nonblock) { - std::unique_lock lock(m_mutex); + if (nonblock && m_buffers.empty() && m_count >= MaxBuffers) + return nullptr; m_cv.wait(lock, [this](){ return m_buffers.size() || m_count < MaxBuffers; }); if (m_buffers.size()) @@ -42,17 +44,10 @@ DataVecPtr BufferCache::fetch() // Put a buffer back in the cache. void BufferCache::replace(DataVecPtr&& buf) { - std::unique_lock lock(m_mutex); - - //ABELL - Fix this. -// buf->resize(BufSize); m_buffers.push_back(std::move(buf)); if (m_count == MaxBuffers) - { - lock.unlock(); m_cv.notify_one(); - } } } // namespace epf diff --git a/external/untwine/epf/BufferCache.hpp b/external/untwine/epf/BufferCache.hpp index d4f5c376287..7dbcf242495 100644 --- a/external/untwine/epf/BufferCache.hpp +++ b/external/untwine/epf/BufferCache.hpp @@ -31,12 +31,11 @@ public: BufferCache() : m_count(0) {} - DataVecPtr fetch(); + DataVecPtr fetch(std::unique_lock& lock, bool nonblock); void replace(DataVecPtr&& buf); private: std::deque m_buffers; - std::mutex m_mutex; std::condition_variable m_cv; int m_count; }; diff --git a/external/untwine/epf/Cell.cpp b/external/untwine/epf/Cell.cpp index 296578bad3e..bb0d1c756c0 100644 --- a/external/untwine/epf/Cell.cpp +++ b/external/untwine/epf/Cell.cpp @@ -21,21 +21,26 @@ namespace epf void Cell::initialize() { - m_buf = m_writer->bufferCache().fetch(); + m_buf = m_writer->fetchBuffer(); + + // If we couldn't fetch a buffer, flush all the the buffers for this processor and + // try again, but block. + if (!m_buf) + { + m_flush(this); + m_buf = m_writer->fetchBufferBlocking(); + } m_pos = m_buf->data(); m_endPos = m_pos + m_pointSize * (BufSize / m_pointSize); } +// NOTE - After write(), the cell is invalid and must be initialized or destroyed. void Cell::write() { - // Resize the buffer so the writer knows how much to write. size_t size = m_pos - m_buf->data(); if (size) -// { -// m_buf->resize(size); m_writer->enqueue(m_key, std::move(m_buf), size); -// } } void Cell::advance() @@ -61,17 +66,35 @@ Cell *CellMgr::get(const VoxelKey& key) auto it = m_cells.find(key); if (it == m_cells.end()) { - std::unique_ptr cell(new Cell(key, m_pointSize, m_writer)); + Cell::FlushFunc f = [this](Cell *exclude) + { + flush(exclude); + }; + std::unique_ptr cell(new Cell(key, m_pointSize, m_writer, f)); it = m_cells.insert( {key, std::move(cell)} ).first; } Cell& c = *(it->second); return &c; } -void CellMgr::flush() +// Eliminate all the cells and their associated data buffers except the `exclude` +// cell. +void CellMgr::flush(Cell *exclude) { - for (auto& cp : m_cells) - cp.second->write(); + CellMap::iterator it = m_cells.end(); + if (exclude) + it = m_cells.find(exclude->key()); + + // If there was no exclude cell or it isn't in our list, just clear the cells. + // Otherwise, save the exclude cell, clear the list, and reinsert. + if (it == m_cells.end()) + m_cells.clear(); + else + { + std::unique_ptr c = std::move(it->second); + m_cells.clear(); + m_cells.insert({c->key(), std::move(c)}); + } } } // namespace epf diff --git a/external/untwine/epf/Cell.hpp b/external/untwine/epf/Cell.hpp index 3c1863bd56e..f1959e8a553 100644 --- a/external/untwine/epf/Cell.hpp +++ b/external/untwine/epf/Cell.hpp @@ -15,7 +15,8 @@ #include #include -#include +#include +#include #include #include "EpfTypes.hpp" @@ -35,12 +36,18 @@ class Writer; class Cell { public: - Cell(const VoxelKey& key, int pointSize, Writer *writer) : - m_key(key), m_pointSize(pointSize), m_writer(writer) + using FlushFunc = std::function; + + Cell(const VoxelKey& key, int pointSize, Writer *writer, FlushFunc flush) : + m_key(key), m_pointSize(pointSize), m_writer(writer), m_flush(flush) { assert(pointSize < BufSize); initialize(); } + ~Cell() + { + write(); + } void initialize(); Point point() @@ -49,7 +56,6 @@ public: { return m_key; } void copyPoint(Point& b) { std::copy(b.data(), b.data() + m_pointSize, m_pos); } - void write(); void advance(); private: @@ -59,19 +65,24 @@ private: Writer *m_writer; uint8_t *m_pos; uint8_t *m_endPos; + FlushFunc m_flush; + + void write(); }; class CellMgr { public: CellMgr(int pointSize, Writer *writer); + Cell *get(const VoxelKey& key); - void flush(); + void flush(Cell *exclude); private: + using CellMap = std::unordered_map>; int m_pointSize; Writer *m_writer; - std::map> m_cells; + CellMap m_cells; }; diff --git a/external/untwine/epf/Epf.cpp b/external/untwine/epf/Epf.cpp index 6fbe880e180..17e551a59fc 100644 --- a/external/untwine/epf/Epf.cpp +++ b/external/untwine/epf/Epf.cpp @@ -61,7 +61,7 @@ void writeMetadata(const std::string& tempDir, const Grid& grid, /// Epf -Epf::Epf() : m_pool(8) +Epf::Epf() : m_pool(NumFileProcessors) {} @@ -120,8 +120,8 @@ void Epf::run(const Options& options, ProgressWriter& progress) } } - // Make a writer with 4 threads. - m_writer.reset(new Writer(options.tempDir, 4, layout->pointSize())); + // Make a writer with NumWriters threads. + m_writer.reset(new Writer(options.tempDir, NumWriters, layout->pointSize())); // Sort file infos so the largest files come first. This helps to make sure we don't delay // processing big files that take the longest (use threads more efficiently). diff --git a/external/untwine/epf/EpfTypes.hpp b/external/untwine/epf/EpfTypes.hpp index 1f81b2e8c70..f0740e64bb1 100644 --- a/external/untwine/epf/EpfTypes.hpp +++ b/external/untwine/epf/EpfTypes.hpp @@ -36,6 +36,8 @@ using Totals = std::unordered_map; constexpr int MaxPointsPerNode = 100000; constexpr int BufSize = 4096 * 10; constexpr int MaxBuffers = 1000; +constexpr int NumWriters = 4; +constexpr int NumFileProcessors = 8; struct FileInfo { diff --git a/external/untwine/epf/FileProcessor.cpp b/external/untwine/epf/FileProcessor.cpp index b997e6bdc4e..07e498bd910 100644 --- a/external/untwine/epf/FileProcessor.cpp +++ b/external/untwine/epf/FileProcessor.cpp @@ -85,12 +85,17 @@ void FileProcessor::run() pdal::FixedPointTable t(1000); - f.prepare(t); - f.execute(t); - m_progress.update(count % CountIncrement); + try + { + f.prepare(t); + f.execute(t); + } + catch (const pdal::pdal_error& err) + { + fatal(err.what()); + } - // Flush any data remaining in the cells. - m_cellMgr.flush(); + m_progress.update(count % CountIncrement); } } // namespace epf diff --git a/external/untwine/epf/Reprocessor.cpp b/external/untwine/epf/Reprocessor.cpp index af6910426e3..c2b5cbf8859 100644 --- a/external/untwine/epf/Reprocessor.cpp +++ b/external/untwine/epf/Reprocessor.cpp @@ -66,7 +66,6 @@ void Reprocessor::run() cell->advance(); pos += m_pointSize; } - m_mgr.flush(); pdal::FileUtils::unmapFile(ctx); pdal::FileUtils::deleteFile(m_filename); } diff --git a/external/untwine/epf/Writer.cpp b/external/untwine/epf/Writer.cpp index de1ddad481f..3801cceace2 100644 --- a/external/untwine/epf/Writer.cpp +++ b/external/untwine/epf/Writer.cpp @@ -58,6 +58,24 @@ Totals Writer::totals(size_t minSize) return t; } +DataVecPtr Writer::fetchBuffer() +{ + std::unique_lock lock(m_mutex); + + // If there are fewer items in the queue than we have FileProcessors, we may choose not + // to block and return a nullptr, expecting that the caller will flush outstanding cells. + return m_bufferCache.fetch(lock, m_queue.size() < NumFileProcessors); +} + + +DataVecPtr Writer::fetchBufferBlocking() +{ + std::unique_lock lock(m_mutex); + + return m_bufferCache.fetch(lock, false); +} + + void Writer::enqueue(const VoxelKey& key, DataVecPtr data, size_t dataSize) { { @@ -123,9 +141,9 @@ void Writer::run() out.close(); if (!out) fatal("Failure writing to '" + path(wd.key) + "'."); - m_bufferCache.replace(std::move(wd.data)); std::lock_guard lock(m_mutex); + m_bufferCache.replace(std::move(wd.data)); m_active.remove(wd.key); } } diff --git a/external/untwine/epf/Writer.hpp b/external/untwine/epf/Writer.hpp index 9bed32cc492..fa507491a3e 100644 --- a/external/untwine/epf/Writer.hpp +++ b/external/untwine/epf/Writer.hpp @@ -43,11 +43,11 @@ public: void enqueue(const VoxelKey& key, DataVecPtr data, size_t dataSize); void stop(); - BufferCache& bufferCache() - { return m_bufferCache; } const Totals& totals() { return m_totals; } Totals totals(size_t minSize); + DataVecPtr fetchBuffer(); + DataVecPtr fetchBufferBlocking(); private: std::string path(const VoxelKey& key); diff --git a/external/untwine_to_qgis.bash b/external/untwine_to_qgis.bash index d8546a758e1..8703e91ffe0 100755 --- a/external/untwine_to_qgis.bash +++ b/external/untwine_to_qgis.bash @@ -1,12 +1,12 @@ #!/usr/bin/env bash -if [ "$#" -ne 1 ] ; then +if [ "$#" -ne 1 ] ; then echo "untwine_to_qgis: untwine directory argument required" exit 1 -fi +fi -UNTWINE_QGIS_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" -UNTWINE_QGIS_DIR=$UNTWINE_QGIS_DIR/untwine +EXTERNAL_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" +UNTWINE_QGIS_DIR=$EXTERNAL_DIR/untwine UNTWINE_DIR=$1 if [ ! -d "$UNTWINE_DIR/untwine" ] ; then @@ -20,7 +20,7 @@ echo "untwine_to_qgis: Remove old version" rm -rf $UNTWINE_QGIS_DIR/* echo "untwine_to_qgis: Copy new version" -rsync -r $UNTWINE_DIR $UNTWINE_QGIS_DIR --exclude="CMakeLists.txt*" --exclude="cmake/" --exclude="README.md" --exclude=".git" --exclude=".gitignore" +rsync -r $UNTWINE_DIR/ $UNTWINE_QGIS_DIR/ --exclude="CMakeLists.txt*" --exclude="cmake/" --exclude="README.md" --exclude=".git" --exclude=".gitignore" echo "untwine_to_qgis: Done" cd $PWD