Update untwine to match the current hobu/untwine main branch

Added a minor update to the untwine_to_qgis.bash script as it was
getting confused about the "untwine" subfolders and copying sources
one level deeper as supposed to
This commit is contained in:
Martin Dobias 2020-12-16 14:25:21 +01:00
parent 42b33db3bc
commit d76ec1cecb
12 changed files with 105 additions and 44 deletions

View File

@ -307,12 +307,21 @@ Processor::writeOctantCompressed(const OctantInfo& o, Index& index, IndexIter po
}
}
flush:
flushCompressed(table, view, o);
try
{
flushCompressed(table, view, o);
}
catch (pdal::pdal_error& err)
{
fatal(err.what());
}
m_manager.logOctant(o.key(), count);
return pos;
}
// Copy data from the source file to the point view.
void Processor::appendCompressed(pdal::PointViewPtr view, const DimInfoList& dims,
const FileInfo& fi, IndexIter begin, IndexIter end)
{

View File

@ -21,9 +21,11 @@ namespace epf
{
// If we have a buffer in the cache, return it. Otherwise create a new one and return that.
DataVecPtr BufferCache::fetch()
// If nonblock is true and there are no available buffers, return null.
DataVecPtr BufferCache::fetch(std::unique_lock<std::mutex>& lock, bool nonblock)
{
std::unique_lock<std::mutex> lock(m_mutex);
if (nonblock && m_buffers.empty() && m_count >= MaxBuffers)
return nullptr;
m_cv.wait(lock, [this](){ return m_buffers.size() || m_count < MaxBuffers; });
if (m_buffers.size())
@ -42,17 +44,10 @@ DataVecPtr BufferCache::fetch()
// Put a buffer back in the cache.
void BufferCache::replace(DataVecPtr&& buf)
{
std::unique_lock<std::mutex> lock(m_mutex);
//ABELL - Fix this.
// buf->resize(BufSize);
m_buffers.push_back(std::move(buf));
if (m_count == MaxBuffers)
{
lock.unlock();
m_cv.notify_one();
}
}
} // namespace epf

View File

@ -31,12 +31,11 @@ public:
BufferCache() : m_count(0)
{}
DataVecPtr fetch();
DataVecPtr fetch(std::unique_lock<std::mutex>& lock, bool nonblock);
void replace(DataVecPtr&& buf);
private:
std::deque<DataVecPtr> m_buffers;
std::mutex m_mutex;
std::condition_variable m_cv;
int m_count;
};

View File

@ -21,21 +21,26 @@ namespace epf
void Cell::initialize()
{
m_buf = m_writer->bufferCache().fetch();
m_buf = m_writer->fetchBuffer();
// If we couldn't fetch a buffer, flush all the the buffers for this processor and
// try again, but block.
if (!m_buf)
{
m_flush(this);
m_buf = m_writer->fetchBufferBlocking();
}
m_pos = m_buf->data();
m_endPos = m_pos + m_pointSize * (BufSize / m_pointSize);
}
// NOTE - After write(), the cell is invalid and must be initialized or destroyed.
void Cell::write()
{
// Resize the buffer so the writer knows how much to write.
size_t size = m_pos - m_buf->data();
if (size)
// {
// m_buf->resize(size);
m_writer->enqueue(m_key, std::move(m_buf), size);
// }
}
void Cell::advance()
@ -61,17 +66,35 @@ Cell *CellMgr::get(const VoxelKey& key)
auto it = m_cells.find(key);
if (it == m_cells.end())
{
std::unique_ptr<Cell> cell(new Cell(key, m_pointSize, m_writer));
Cell::FlushFunc f = [this](Cell *exclude)
{
flush(exclude);
};
std::unique_ptr<Cell> cell(new Cell(key, m_pointSize, m_writer, f));
it = m_cells.insert( {key, std::move(cell)} ).first;
}
Cell& c = *(it->second);
return &c;
}
void CellMgr::flush()
// Eliminate all the cells and their associated data buffers except the `exclude`
// cell.
void CellMgr::flush(Cell *exclude)
{
for (auto& cp : m_cells)
cp.second->write();
CellMap::iterator it = m_cells.end();
if (exclude)
it = m_cells.find(exclude->key());
// If there was no exclude cell or it isn't in our list, just clear the cells.
// Otherwise, save the exclude cell, clear the list, and reinsert.
if (it == m_cells.end())
m_cells.clear();
else
{
std::unique_ptr<Cell> c = std::move(it->second);
m_cells.clear();
m_cells.insert({c->key(), std::move(c)});
}
}
} // namespace epf

View File

@ -15,7 +15,8 @@
#include <cstdint>
#include <cstddef>
#include <map>
#include <functional>
#include <unordered_map>
#include <memory>
#include "EpfTypes.hpp"
@ -35,12 +36,18 @@ class Writer;
class Cell
{
public:
Cell(const VoxelKey& key, int pointSize, Writer *writer) :
m_key(key), m_pointSize(pointSize), m_writer(writer)
using FlushFunc = std::function<void(Cell *)>;
Cell(const VoxelKey& key, int pointSize, Writer *writer, FlushFunc flush) :
m_key(key), m_pointSize(pointSize), m_writer(writer), m_flush(flush)
{
assert(pointSize < BufSize);
initialize();
}
~Cell()
{
write();
}
void initialize();
Point point()
@ -49,7 +56,6 @@ public:
{ return m_key; }
void copyPoint(Point& b)
{ std::copy(b.data(), b.data() + m_pointSize, m_pos); }
void write();
void advance();
private:
@ -59,19 +65,24 @@ private:
Writer *m_writer;
uint8_t *m_pos;
uint8_t *m_endPos;
FlushFunc m_flush;
void write();
};
class CellMgr
{
public:
CellMgr(int pointSize, Writer *writer);
Cell *get(const VoxelKey& key);
void flush();
void flush(Cell *exclude);
private:
using CellMap = std::unordered_map<VoxelKey, std::unique_ptr<Cell>>;
int m_pointSize;
Writer *m_writer;
std::map<VoxelKey, std::unique_ptr<Cell>> m_cells;
CellMap m_cells;
};

View File

@ -61,7 +61,7 @@ void writeMetadata(const std::string& tempDir, const Grid& grid,
/// Epf
Epf::Epf() : m_pool(8)
Epf::Epf() : m_pool(NumFileProcessors)
{}
@ -120,8 +120,8 @@ void Epf::run(const Options& options, ProgressWriter& progress)
}
}
// Make a writer with 4 threads.
m_writer.reset(new Writer(options.tempDir, 4, layout->pointSize()));
// Make a writer with NumWriters threads.
m_writer.reset(new Writer(options.tempDir, NumWriters, layout->pointSize()));
// Sort file infos so the largest files come first. This helps to make sure we don't delay
// processing big files that take the longest (use threads more efficiently).

View File

@ -36,6 +36,8 @@ using Totals = std::unordered_map<VoxelKey, size_t>;
constexpr int MaxPointsPerNode = 100000;
constexpr int BufSize = 4096 * 10;
constexpr int MaxBuffers = 1000;
constexpr int NumWriters = 4;
constexpr int NumFileProcessors = 8;
struct FileInfo
{

View File

@ -85,12 +85,17 @@ void FileProcessor::run()
pdal::FixedPointTable t(1000);
f.prepare(t);
f.execute(t);
m_progress.update(count % CountIncrement);
try
{
f.prepare(t);
f.execute(t);
}
catch (const pdal::pdal_error& err)
{
fatal(err.what());
}
// Flush any data remaining in the cells.
m_cellMgr.flush();
m_progress.update(count % CountIncrement);
}
} // namespace epf

View File

@ -66,7 +66,6 @@ void Reprocessor::run()
cell->advance();
pos += m_pointSize;
}
m_mgr.flush();
pdal::FileUtils::unmapFile(ctx);
pdal::FileUtils::deleteFile(m_filename);
}

View File

@ -58,6 +58,24 @@ Totals Writer::totals(size_t minSize)
return t;
}
DataVecPtr Writer::fetchBuffer()
{
std::unique_lock<std::mutex> lock(m_mutex);
// If there are fewer items in the queue than we have FileProcessors, we may choose not
// to block and return a nullptr, expecting that the caller will flush outstanding cells.
return m_bufferCache.fetch(lock, m_queue.size() < NumFileProcessors);
}
DataVecPtr Writer::fetchBufferBlocking()
{
std::unique_lock<std::mutex> lock(m_mutex);
return m_bufferCache.fetch(lock, false);
}
void Writer::enqueue(const VoxelKey& key, DataVecPtr data, size_t dataSize)
{
{
@ -123,9 +141,9 @@ void Writer::run()
out.close();
if (!out)
fatal("Failure writing to '" + path(wd.key) + "'.");
m_bufferCache.replace(std::move(wd.data));
std::lock_guard<std::mutex> lock(m_mutex);
m_bufferCache.replace(std::move(wd.data));
m_active.remove(wd.key);
}
}

View File

@ -43,11 +43,11 @@ public:
void enqueue(const VoxelKey& key, DataVecPtr data, size_t dataSize);
void stop();
BufferCache& bufferCache()
{ return m_bufferCache; }
const Totals& totals()
{ return m_totals; }
Totals totals(size_t minSize);
DataVecPtr fetchBuffer();
DataVecPtr fetchBufferBlocking();
private:
std::string path(const VoxelKey& key);

View File

@ -1,12 +1,12 @@
#!/usr/bin/env bash
if [ "$#" -ne 1 ] ; then
if [ "$#" -ne 1 ] ; then
echo "untwine_to_qgis: untwine directory argument required"
exit 1
fi
fi
UNTWINE_QGIS_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
UNTWINE_QGIS_DIR=$UNTWINE_QGIS_DIR/untwine
EXTERNAL_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
UNTWINE_QGIS_DIR=$EXTERNAL_DIR/untwine
UNTWINE_DIR=$1
if [ ! -d "$UNTWINE_DIR/untwine" ] ; then
@ -20,7 +20,7 @@ echo "untwine_to_qgis: Remove old version"
rm -rf $UNTWINE_QGIS_DIR/*
echo "untwine_to_qgis: Copy new version"
rsync -r $UNTWINE_DIR $UNTWINE_QGIS_DIR --exclude="CMakeLists.txt*" --exclude="cmake/" --exclude="README.md" --exclude=".git" --exclude=".gitignore"
rsync -r $UNTWINE_DIR/ $UNTWINE_QGIS_DIR/ --exclude="CMakeLists.txt*" --exclude="cmake/" --exclude="README.md" --exclude=".git" --exclude=".gitignore"
echo "untwine_to_qgis: Done"
cd $PWD