From dff9ec2d376eb9e12e282f7c7e0a0640f36ad0be Mon Sep 17 00:00:00 2001 From: Ryzerth Date: Mon, 28 Jun 2021 22:06:42 +0200 Subject: [PATCH] new buffer thingy --- core/src/dsp/buffer.h | 108 +++++++++++++++++++++++++++++++++++ core/src/gui/main_window.cpp | 4 ++ core/src/signal_path/dsp.cpp | 9 ++- core/src/signal_path/dsp.h | 2 + 4 files changed, 121 insertions(+), 2 deletions(-) diff --git a/core/src/dsp/buffer.h b/core/src/dsp/buffer.h index c5999f6..2bd3824 100644 --- a/core/src/dsp/buffer.h +++ b/core/src/dsp/buffer.h @@ -214,4 +214,112 @@ namespace dsp { std::condition_variable canReadVar; std::condition_variable canWriteVar; }; + +#define TEST_BUFFER_SIZE 32 + + template + class SampleFrameBuffer : public generic_block> { + public: + SampleFrameBuffer() {} + + SampleFrameBuffer(stream* in) { init(in); } + + ~SampleFrameBuffer() { + stop(); + out.stopWriter(); + stopWorker = true; + cnd.notify_all(); + if (readWorkerThread.joinable()) { readWorkerThread.join(); } + } + + void init(stream* in) { + _in = in; + + for (int i = 0; i < TEST_BUFFER_SIZE; i++) { + buffers[i] = new T[STREAM_BUFFER_SIZE]; + } + + generic_block>::registerInput(in); + generic_block>::registerOutput(&out); + + readWorkerThread = std::thread(&SampleFrameBuffer::worker, this); + } + + void setInput(stream* in) { + std::lock_guard lck(generic_block>::ctrlMtx); + generic_block>::tempStop(); + generic_block>::unregisterInput(_in); + _in = in; + generic_block>::registerInput(_in); + generic_block>::tempStart(); + } + + int run() { + // Wait for data + int count = _in->read(); + if (count < 0) { return -1; } + + if (bypass) { + memcpy(out.writeBuf, _in->readBuf, count * sizeof(T)); + _in->flush(); + if (!out.swap(count)) { return -1; } + return count; + } + + // Push it on the ring buffer + { + std::lock_guard lck(bufMtx); + memcpy(buffers[writeCur], _in->readBuf, count * sizeof(T)); + uintptr_t ptr = (uintptr_t)buffers[writeCur]; + sizes[writeCur] = count; + writeCur++; + writeCur = ((writeCur) % TEST_BUFFER_SIZE); + + if (((writeCur - readCur + TEST_BUFFER_SIZE) % TEST_BUFFER_SIZE) >= (TEST_BUFFER_SIZE-2)) { + spdlog::warn("Overflow"); + } + } + cnd.notify_all(); + _in->flush(); + return count; + } + + void worker() { + while (true) { + // Wait for data + std::unique_lock lck(bufMtx); + cnd.wait(lck, [this](){ return (((writeCur - readCur + TEST_BUFFER_SIZE) % TEST_BUFFER_SIZE) > 0) || stopWorker; }); + if (stopWorker) { break; } + + // Write one to output buffer and unlock in preparation to swap buffers + int count = sizes[readCur]; + memcpy(out.writeBuf, buffers[readCur], count * sizeof(T)); + readCur++; + readCur = ((readCur) % TEST_BUFFER_SIZE); + lck.unlock(); + + // Swap + if (!out.swap(count)) { break; } + } + } + + stream out; + + int writeCur = 0; + int readCur = 0; + + bool bypass = false; + + private: + stream* _in; + + std::thread readWorkerThread; + std::mutex bufMtx; + std::condition_variable cnd; + T* buffers[TEST_BUFFER_SIZE]; + int sizes[TEST_BUFFER_SIZE]; + + bool stopWorker = false; + + }; }; \ No newline at end of file diff --git a/core/src/gui/main_window.cpp b/core/src/gui/main_window.cpp index cdde138..2181f4e 100644 --- a/core/src/gui/main_window.cpp +++ b/core/src/gui/main_window.cpp @@ -459,6 +459,10 @@ void MainWindow::draw() { ImGui::Checkbox("Show demo window", &demoWindow); ImGui::Text("ImGui version: %s", ImGui::GetVersion()); + ImGui::Checkbox("Bypass buffering", &sigpath::signalPath.inputBuffer.bypass); + + ImGui::Text("Buffering: %d", (sigpath::signalPath.inputBuffer.writeCur - sigpath::signalPath.inputBuffer.readCur + 20) % 20); + if (ImGui::Button("Test Bug")) { spdlog::error("Will this make the software crash?"); } diff --git a/core/src/signal_path/dsp.cpp b/core/src/signal_path/dsp.cpp index 18698dd..e8daa39 100644 --- a/core/src/signal_path/dsp.cpp +++ b/core/src/signal_path/dsp.cpp @@ -10,7 +10,9 @@ void SignalPath::init(uint64_t sampleRate, int fftRate, int fftSize, dsp::stream this->fftSize = fftSize; inputBlockSize = sampleRate / 200.0f; - split.init(input); + // split.init(input); + inputBuffer.init(input); + split.init(&inputBuffer.out); reshape.init(&fftStream, fftSize, (sampleRate / fftRate) - fftSize); split.bindStream(&fftStream); @@ -45,12 +47,14 @@ double SignalPath::getSampleRate() { } void SignalPath::start() { + inputBuffer.start(); split.start(); reshape.start(); fftHandlerSink.start(); } void SignalPath::stop() { + inputBuffer.stop(); split.stop(); reshape.stop(); fftHandlerSink.stop(); @@ -83,7 +87,8 @@ void SignalPath::removeVFO(std::string name) { } void SignalPath::setInput(dsp::stream* input) { - split.setInput(input); + // split.setInput(input); + inputBuffer.setInput(input); } void SignalPath::bindIQStream(dsp::stream* stream) { diff --git a/core/src/signal_path/dsp.h b/core/src/signal_path/dsp.h index e01f3c3..99fc94b 100644 --- a/core/src/signal_path/dsp.h +++ b/core/src/signal_path/dsp.h @@ -21,6 +21,8 @@ public: void startFFT(); void stopFFT(); + dsp::SampleFrameBuffer inputBuffer; + private: struct VFO_t { dsp::stream* inputStream;