Files
valhallir-deconvolver/pkg/convolve/convolve.go

654 lines
17 KiB
Go

package convolve
import (
"fmt"
"log"
"math"
"math/cmplx"
"os"
"image/png"
"image/color"
"path/filepath"
"github.com/mjibson/go-dsp/fft"
"gonum.org/v1/gonum/dsp/fourier"
"gonum.org/v1/plot"
"gonum.org/v1/plot/font"
"gonum.org/v1/plot/plotter"
"gonum.org/v1/plot/vg"
"gonum.org/v1/plot/vg/draw"
"gonum.org/v1/plot/vg/vgimg"
)
// nextPowerOfTwo returns the next power of two >= n
func nextPowerOfTwo(n int) int {
p := 1
for p < n {
p <<= 1
}
return p
}
// Convolve performs FFT-based convolution of two audio signals
// Deprecated: Use Deconvolve for IR extraction from sweep and recorded signals
func Convolve(signal1, signal2 []float64) []float64 {
resultLen := len(signal1) + len(signal2) - 1
fftLen := nextPowerOfTwo(resultLen)
log.Printf("[convolve] signal1: %d, signal2: %d, resultLen: %d, fftLen: %d", len(signal1), len(signal2), resultLen, fftLen)
// Zero-pad both signals to fftLen as float64
x := make([]float64, fftLen)
copy(x, signal1)
y := make([]float64, fftLen)
copy(y, signal2)
// FFT
fft := fourier.NewFFT(fftLen)
xFreq := fft.Coefficients(nil, x) // []complex128
yFreq := fft.Coefficients(nil, y) // []complex128
log.Printf("[convolve] xFreq length: %d, yFreq length: %d", len(xFreq), len(yFreq))
// Multiply in frequency domain
outFreq := make([]complex128, len(xFreq))
for i := 0; i < len(xFreq) && i < len(yFreq); i++ {
outFreq[i] = xFreq[i] * yFreq[i]
}
// Inverse FFT (returns []float64)
outTime := fft.Sequence(nil, outFreq)
log.Printf("[convolve] outTime length: %d", len(outTime))
// Defensive: avoid index out of range
copyLen := resultLen
if len(outTime) < resultLen {
log.Printf("[convolve] Warning: outTime length (%d) < resultLen (%d), truncating resultLen", len(outTime), resultLen)
copyLen = len(outTime)
}
result := make([]float64, copyLen)
copy(result, outTime[:copyLen])
return result
}
// Deconvolve extracts the impulse response (IR) from a sweep and its recorded version
// by dividing the FFT of the recorded by the FFT of the sweep, with regularization.
func Deconvolve(sweep, recorded []float64) []float64 {
resultLen := len(recorded)
fftLen := nextPowerOfTwo(resultLen)
log.Printf("[deconvolve] sweep: %d, recorded: %d, resultLen: %d, fftLen: %d", len(sweep), len(recorded), resultLen, fftLen)
// Zero-pad both signals to fftLen
sweepPadded := make([]float64, fftLen)
recordedPadded := make([]float64, fftLen)
copy(sweepPadded, sweep)
copy(recordedPadded, recorded)
fft := fourier.NewFFT(fftLen)
sweepFFT := fft.Coefficients(nil, sweepPadded)
recordedFFT := fft.Coefficients(nil, recordedPadded)
log.Printf("[deconvolve] sweepFFT length: %d, recordedFFT length: %d", len(sweepFFT), len(recordedFFT))
// Regularization epsilon to avoid division by zero
const epsilon = 1e-10
minLen := len(sweepFFT)
if len(recordedFFT) < minLen {
minLen = len(recordedFFT)
}
irFFT := make([]complex128, minLen)
for i := 0; i < minLen; i++ {
denom := sweepFFT[i]
if cmplx.Abs(denom) < epsilon {
denom = complex(epsilon, 0)
}
irFFT[i] = recordedFFT[i] / denom
}
irTime := fft.Sequence(nil, irFFT)
log.Printf("[deconvolve] irTime length: %d", len(irTime))
// Defensive: avoid index out of range
copyLen := resultLen
if len(irTime) < resultLen {
log.Printf("[deconvolve] Warning: irTime length (%d) < resultLen (%d), truncating resultLen", len(irTime), resultLen)
copyLen = len(irTime)
}
result := make([]float64, copyLen)
copy(result, irTime[:copyLen])
return result
}
// Normalize normalizes the audio data to prevent clipping
// targetPeak is the maximum peak value (e.g., 0.95 for 95% of full scale)
func Normalize(data []float64, targetPeak float64) []float64 {
if len(data) == 0 {
return data
}
// Find the maximum absolute value
maxVal := 0.0
for _, sample := range data {
absVal := math.Abs(sample)
if absVal > maxVal {
maxVal = absVal
}
}
if maxVal == 0 {
return data
}
// Calculate normalization factor
normFactor := targetPeak / maxVal
// Apply normalization
normalized := make([]float64, len(data))
for i, sample := range data {
normalized[i] = sample * normFactor
}
return normalized
}
// TrimSilence removes leading and trailing silence from the audio data
// threshold is the amplitude threshold below which samples are considered silence
func TrimSilence(data []float64, threshold float64) []float64 {
if len(data) == 0 {
return data
}
// Find start (first non-silent sample)
start := 0
for i, sample := range data {
if math.Abs(sample) > threshold {
start = i
break
}
}
// Find end (last non-silent sample)
end := len(data) - 1
for i := len(data) - 1; i >= 0; i-- {
if math.Abs(data[i]) > threshold {
end = i
break
}
}
if start >= end {
return []float64{}
}
return data[start : end+1]
}
// TrimOrPad trims or zero-pads the data to the specified number of samples
func TrimOrPad(data []float64, targetSamples int) []float64 {
if len(data) == targetSamples {
return data
} else if len(data) > targetSamples {
return data[:targetSamples]
} else {
out := make([]float64, targetSamples)
copy(out, data)
// zero-padding is default
return out
}
}
// padOrTruncate ensures a slice is exactly n elements long
func padOrTruncate[T any](in []T, n int) []T {
if len(in) == n {
return in
} else if len(in) > n {
return in[:n]
}
out := make([]T, n)
copy(out, in)
return out
}
// Helper to reconstruct full Hermitian spectrum from N/2+1 real FFT
func hermitianSymmetric(fullLen int, halfSpec []complex128) []complex128 {
full := make([]complex128, fullLen)
N := fullLen
// DC
full[0] = halfSpec[0]
// Positive freqs
for k := 1; k < N/2; k++ {
full[k] = halfSpec[k]
full[N-k] = cmplx.Conj(halfSpec[k])
}
// Nyquist (if even)
if N%2 == 0 {
full[N/2] = halfSpec[N/2]
}
return full
}
// MinimumPhaseTransform using go-dsp/fft for full complex FFT/IFFT
func MinimumPhaseTransform(ir []float64) []float64 {
if len(ir) == 0 {
return ir
}
origLen := len(ir)
fftLen := nextPowerOfTwo(origLen)
padded := padOrTruncate(ir, fftLen)
log.Printf("[MPT] fftLen: %d, padded len: %d", fftLen, len(padded))
// Convert to complex
signal := make([]complex128, fftLen)
for i, v := range padded {
signal[i] = complex(v, 0)
}
// FFT
X := fft.FFT(signal)
// Log-magnitude spectrum (complex)
logMag := make([]complex128, fftLen)
for i, v := range X {
mag := cmplx.Abs(v)
if mag < 1e-12 {
mag = 1e-12
}
logMag[i] = complex(math.Log(mag), 0)
}
// IFFT to get real cepstrum
cepstrumC := fft.IFFT(logMag)
// Minimum phase cepstrum
minPhaseCep := make([]complex128, fftLen)
minPhaseCep[0] = cepstrumC[0] // DC
for i := 1; i < fftLen/2; i++ {
minPhaseCep[i] = 2 * cepstrumC[i]
}
if fftLen%2 == 0 {
minPhaseCep[fftLen/2] = cepstrumC[fftLen/2] // Nyquist (if even)
}
// Negative quefrency: zero (already zero by default)
// FFT of minimum phase cepstrum
minPhaseSpec := fft.FFT(minPhaseCep)
// Exponentiate to get minimum phase spectrum
for i := range minPhaseSpec {
minPhaseSpec[i] = cmplx.Exp(minPhaseSpec[i])
}
// IFFT to get minimum phase IR
minPhaseIR := fft.IFFT(minPhaseSpec)
// Return the real part, original length
result := make([]float64, origLen)
for i := range result {
result[i] = real(minPhaseIR[i])
}
return result
}
// realSlice extracts the real part of a []complex128 as []float64
func realSlice(in []complex128) []float64 {
out := make([]float64, len(in))
for i, v := range in {
out[i] = real(v)
}
return out
}
// Resample resamples audio data from one sample rate to another using linear interpolation
func Resample(data []float64, fromSampleRate, toSampleRate int) []float64 {
if fromSampleRate == toSampleRate {
return data
}
// Calculate the resampling ratio
ratio := float64(toSampleRate) / float64(fromSampleRate)
newLength := int(float64(len(data)) * ratio)
if newLength == 0 {
return []float64{}
}
result := make([]float64, newLength)
for i := 0; i < newLength; i++ {
// Calculate the position in the original data
pos := float64(i) / ratio
// Get the integer and fractional parts
posInt := int(pos)
posFrac := pos - float64(posInt)
// Linear interpolation
if posInt >= len(data)-1 {
// Beyond the end of the data, use the last sample
result[i] = data[len(data)-1]
} else {
// Interpolate between two samples
sample1 := data[posInt]
sample2 := data[posInt+1]
result[i] = sample1 + posFrac*(sample2-sample1)
}
}
return result
}
// FadeOutLinear applies a linear fade-out to the last fadeSamples of the data.
// fadeSamples is the number of samples over which to fade to zero.
func FadeOutLinear(data []float64, fadeSamples int) []float64 {
if fadeSamples <= 0 || len(data) == 0 {
return data
}
if fadeSamples > len(data) {
fadeSamples = len(data)
}
out := make([]float64, len(data))
copy(out, data)
start := len(data) - fadeSamples
for i := start; i < len(data); i++ {
fade := float64(len(data)-i) / float64(fadeSamples)
out[i] *= fade
}
return out
}
// ApplyLowpassButterworth applies a 2nd-order Butterworth low-pass filter to the data.
// cutoffHz: cutoff frequency in Hz, sampleRate: sample rate in Hz.
func ApplyLowpassButterworth(data []float64, sampleRate int, cutoffHz float64) []float64 {
if cutoffHz <= 0 || cutoffHz >= float64(sampleRate)/2 {
return data
}
// Biquad coefficients
w0 := 2 * math.Pi * cutoffHz / float64(sampleRate)
cosw0 := math.Cos(w0)
sinw0 := math.Sin(w0)
Q := 1.0 / math.Sqrt(2) // Butterworth Q
alpha := sinw0 / (2 * Q)
b0 := (1 - cosw0) / 2
b1 := 1 - cosw0
b2 := (1 - cosw0) / 2
a0 := 1 + alpha
a1 := -2 * cosw0
a2 := 1 - alpha
// Normalize
b0 /= a0
b1 /= a0
b2 /= a0
a1 /= a0
a2 /= a0
// Apply filter (Direct Form I)
out := make([]float64, len(data))
var x1, x2, y1, y2 float64
for i := 0; i < len(data); i++ {
x0 := data[i]
y0 := b0*x0 + b1*x1 + b2*x2 - a1*y1 - a2*y2
out[i] = y0
x2 = x1
x1 = x0
y2 = y1
y1 = y0
}
return out
}
// ApplyHighpassButterworth applies a 2nd-order Butterworth high-pass filter to the data.
// cutoffHz: cutoff frequency in Hz, sampleRate: sample rate in Hz.
func ApplyHighpassButterworth(data []float64, sampleRate int, cutoffHz float64) []float64 {
if cutoffHz <= 0 || cutoffHz >= float64(sampleRate)/2 {
return data
}
// Biquad coefficients
w0 := 2 * math.Pi * cutoffHz / float64(sampleRate)
cosw0 := math.Cos(w0)
sinw0 := math.Sin(w0)
Q := 1.0 / math.Sqrt(2) // Butterworth Q
alpha := sinw0 / (2 * Q)
b0 := (1 + cosw0) / 2
b1 := -(1 + cosw0)
b2 := (1 + cosw0) / 2
a0 := 1 + alpha
a1 := -2 * cosw0
a2 := 1 - alpha
// Normalize
b0 /= a0
b1 /= a0
b2 /= a0
a1 /= a0
a2 /= a0
// Apply filter (Direct Form I)
out := make([]float64, len(data))
var x1, x2, y1, y2 float64
for i := 0; i < len(data); i++ {
x0 := data[i]
y0 := b0*x0 + b1*x1 + b2*x2 - a1*y1 - a2*y2
out[i] = y0
x2 = x1
x1 = x0
y2 = y1
y1 = y0
}
return out
}
// CascadeLowcut applies the low-cut (high-pass) filter multiple times for steeper slopes.
// slopeDb: 12, 24, 36, ... (dB/octave)
func CascadeLowcut(data []float64, sampleRate int, cutoffHz float64, slopeDb int) []float64 {
if slopeDb < 12 {
slopeDb = 12
}
n := slopeDb / 12
out := data
for i := 0; i < n; i++ {
out = ApplyHighpassButterworth(out, sampleRate, cutoffHz)
}
return out
}
// CascadeHighcut applies the high-cut (low-pass) filter multiple times for steeper slopes.
// slopeDb: 12, 24, 36, ... (dB/octave)
func CascadeHighcut(data []float64, sampleRate int, cutoffHz float64, slopeDb int) []float64 {
if slopeDb < 12 {
slopeDb = 12
}
n := slopeDb / 12
out := data
for i := 0; i < n; i++ {
out = ApplyLowpassButterworth(out, sampleRate, cutoffHz)
}
return out
}
// PlotIR plots the frequency response (magnitude in dB vs. frequency in Hz) of the IR to ir_plot.png
func PlotIR(ir []float64, sampleRate int, irFileName string) error {
if len(ir) == 0 {
return nil
}
// Use only the first 8192 samples of the IR for plotting
windowLen := 8192
if len(ir) < windowLen {
windowLen = len(ir)
}
irWin := ir[:windowLen]
X := fft.FFTReal(irWin)
// Plot from 20 Hz up to 20kHz, include every bin
var plotPts plotter.XYs
var minDb float64 = 1e9
var maxDb float64 = -1e9
var minDbFreq float64
freqBins := windowLen / 2
for i := 1; i < freqBins; i++ {
freq := float64(i) * float64(sampleRate) / float64(windowLen)
if freq < 20.0 {
continue
}
if freq > 20000.0 {
break
}
mag := cmplx.Abs(X[i])
if mag < 1e-12 {
mag = 1e-12
}
db := 20 * math.Log10(mag)
plotPts = append(plotPts, plotter.XY{X: freq, Y: db})
if db < minDb {
minDb = db
minDbFreq = freq
}
if db > maxDb {
maxDb = db
}
}
fmt.Printf("[PlotIR] minDb in plotted range: %.2f dB at %.2f Hz\n", minDb, minDbFreq)
p := plot.New()
p.Title.Text = "IR Frequency Response (dB, 2048-sample window)"
p.X.Label.Text = "Frequency (Hz)"
p.Y.Label.Text = "Magnitude (dB)"
p.X.Scale = plot.LogScale{}
p.X.Tick.Marker = plot.TickerFunc(func(min, max float64) []plot.Tick {
ticks := []float64{20, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 20000}
labels := []string{"20", "50", "100", "200", "500", "1k", "2k", "5k", "10k", "20k"}
var result []plot.Tick
for i, v := range ticks {
if v >= min && v <= max {
result = append(result, plot.Tick{Value: v, Label: labels[i]})
}
}
return result
})
line, err := plotter.NewLine(plotPts)
if err != nil {
return err
}
// Set line color to blue
line.Color = color.RGBA{R: 30, G: 100, B: 220, A: 255}
p.Add(line)
// Find minimum dB value between 20 Hz and 50 Hz for y-axis anchor
minDb2050 := 1e9
for i := 1; i < freqBins; i++ {
freq := float64(i) * float64(sampleRate) / float64(windowLen)
if freq < 20.0 {
continue
}
if freq > 50.0 {
break
}
mag := cmplx.Abs(X[i])
if mag < 1e-12 {
mag = 1e-12
}
db := 20 * math.Log10(mag)
if db < minDb2050 {
minDb2050 = db
}
}
p.Y.Min = minDb2050
p.Y.Max = math.Ceil(maxDb)
p.X.Min = 20.0
p.X.Max = 20000.0
// --- Time-aligned waveform plot ---
p2 := plot.New()
p2.Title.Text = "IR Waveform (Time Aligned)"
p2.X.Label.Text = "Time (ms)"
p2.Y.Label.Text = "Amplitude"
// Prepare waveform data (only first 10ms)
var pts plotter.XYs
maxTimeMs := 10.0
for i := 0; i < windowLen; i++ {
t := float64(i) * 1000.0 / float64(sampleRate) // ms
if t > maxTimeMs {
break
}
pts = append(pts, plotter.XY{X: t, Y: irWin[i]})
}
wline, err := plotter.NewLine(pts)
if err != nil {
return err
}
wline.Color = color.RGBA{R: 30, G: 100, B: 220, A: 255}
p2.Add(wline)
p2.X.Min = 0
p2.X.Max = maxTimeMs
// Y range auto
// --- Compose both plots vertically ---
const width = 6 * vg.Inch
const height = 8 * vg.Inch // increased height for frequency diagram
img := vgimg.New(width, height+1*vg.Inch) // extra space for logo/headline
dc := draw.New(img)
// Draw logo at the top left, headline to the right, IR filename below
logoPath := "assets/logo.png"
logoW := 2.4 * vg.Inch // doubled size
logoH := 0.68 * vg.Inch // doubled size
logoX := 0.3 * vg.Inch
logoY := height + 0.2*vg.Inch // move logo down by an additional ~10px
logoDrawn := false
f, err := os.Open(logoPath)
if err == nil {
defer f.Close()
logoImg, err := png.Decode(f)
if err == nil {
rect := vg.Rectangle{
Min: vg.Point{X: logoX, Y: logoY},
Max: vg.Point{X: logoX + logoW, Y: logoY + logoH},
}
dc.DrawImage(rect, logoImg)
logoDrawn = true
}
}
// Draw headline (bold, larger) to the right of the logo
headline := "Valhallir Deconvolver IR Analysis"
fntSize := vg.Points(14) // Same as IR filename
if logoDrawn {
headlineX := logoX + logoW + 0.3*vg.Inch
headlineY := logoY + logoH - vg.Points(16) - vg.Points(5) // move headline up by ~10px
boldFont := plot.DefaultFont
boldFont.Weight = 3 // font.WeightBold is 3 in gonum/plot/font
boldFace := font.DefaultCache.Lookup(boldFont, fntSize)
dc.SetColor(color.Black)
dc.FillString(boldFace, vg.Point{X: headlineX, Y: headlineY}, headline)
// Draw IR filename below headline, left-aligned, standard font
fileLabel := "IR-File: " + filepath.Base(irFileName)
fileY := headlineY - fntSize - vg.Points(6)
fileFace := font.DefaultCache.Lookup(plot.DefaultFont, vg.Points(10))
dc.FillString(fileFace, vg.Point{X: headlineX, Y: fileY}, fileLabel)
}
// Custom tile arrangement: frequency diagram gets more height, waveform gets less
tiles := draw.Tiles{
Rows: 2,
Cols: 1,
PadX: vg.Millimeter,
PadY: 20 * vg.Millimeter, // more space between plots to emphasize frequency diagram
PadTop: vg.Points(15), // move diagrams down by ~20px
}
// Offset the plots down by 1 inch to make space for logo/headline
imgPlots := vgimg.New(width, height)
dcPlots := draw.New(imgPlots)
canvases := plot.Align([][]*plot.Plot{{p}, {p2}}, tiles, dcPlots)
p.Draw(canvases[0][0])
p2.Draw(canvases[1][0])
dc.DrawImage(vg.Rectangle{Min: vg.Point{X: 0, Y: 0}, Max: vg.Point{X: width, Y: height}}, imgPlots.Image())
// Save as PNG
f, err = os.Create("ir_plot.png")
if err != nil {
return err
}
defer f.Close()
_, err = vgimg.PngCanvas{Canvas: img}.WriteTo(f)
return err
}