-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstreaming.py
More file actions
238 lines (183 loc) · 8.6 KB
/
streaming.py
File metadata and controls
238 lines (183 loc) · 8.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
#!/usr/bin/env python3
"""
Streaming/Frame-by-Frame Processing Example
Demonstrates real-time processing using frame-by-frame computation.
Useful for online applications where you need to process audio as it arrives.
"""
import numpy as np
import spectrograms as sg
def simulate_streaming_audio(sample_rate, duration, chunk_duration=0.1):
"""
Simulate streaming audio by yielding chunks.
Args:
sample_rate: Sample rate in Hz
duration: Total duration in seconds
chunk_duration: Duration of each chunk in seconds
Yields:
Audio chunks as numpy arrays
"""
chunk_samples = int(sample_rate * chunk_duration)
total_samples = int(sample_rate * duration)
# Generate a frequency-modulated signal
t_all = np.linspace(0, duration, total_samples, dtype=np.float64)
# Frequency varies from 200 Hz to 800 Hz over time
f_t = 200 + 600 * (t_all / duration)
phase = 2 * np.pi * np.cumsum(f_t) / sample_rate
signal = np.sin(phase)
# Yield chunks
for start in range(0, total_samples, chunk_samples):
end = min(start + chunk_samples, total_samples)
yield signal[start:end]
def main():
print("=" * 60)
print("Streaming/Frame-by-Frame Processing Example")
print("=" * 60)
# Configuration
sample_rate = 16000
total_duration = 2.0
chunk_duration = 0.1 # Process in 100ms chunks
print("\nStreaming configuration:")
print(f" Sample rate: {sample_rate} Hz")
print(f" Total duration: {total_duration} s")
print(f" Chunk duration: {chunk_duration} s")
print(f" Chunk size: {int(sample_rate * chunk_duration)} samples")
# Set up spectrogram parameters
stft = sg.StftParams(
n_fft=512, hop_size=256, window=sg.WindowType.hanning, centre=True
)
params = sg.SpectrogramParams(stft, sample_rate=sample_rate)
mel_params = sg.MelParams(n_mels=40, f_min=0.0, f_max=8000.0)
# Create plan for efficient processing
planner = sg.SpectrogramPlanner()
plan = planner.mel_power_plan(params, mel_params)
print("\nSTFT parameters:")
print(f" FFT size: {stft.n_fft}")
print(f" Hop size: {stft.hop_size}")
print(f" Mel bands: {mel_params.n_mels}")
# ========================================================================
# Method 1: Batch processing (compute entire spectrogram at once)
# ========================================================================
print("\n" + "=" * 60)
print("Method 1: Batch Processing (for comparison)")
print("=" * 60)
# Collect all audio first
all_chunks = list(
simulate_streaming_audio(sample_rate, total_duration, chunk_duration)
)
full_signal = np.concatenate(all_chunks)
print("\nProcessing full signal at once...")
print(f" Total samples: {len(full_signal)}")
batch_spec = plan.compute(full_signal)
print("\nBatch spectrogram computed:")
print(f" Shape: {batch_spec.shape}")
print(f" Duration: {batch_spec.duration():.3f} s")
# ========================================================================
# Method 2: Frame-by-frame streaming processing
# ========================================================================
print("\n" + "=" * 60)
print("Method 2: Frame-by-Frame Streaming")
print("=" * 60)
# For streaming, we need to buffer audio to compute frames
# Each frame needs n_fft samples, and we hop by hop_size samples
frame_buffer = np.array([], dtype=np.float64)
frame_results = []
chunks_processed = 0
print("\nStreaming audio chunks...")
for chunk in simulate_streaming_audio(sample_rate, total_duration, chunk_duration):
chunks_processed += 1
# Add chunk to buffer
frame_buffer = np.concatenate([frame_buffer, chunk])
# Compute as many complete frames as possible
frames_in_buffer = (len(frame_buffer) - stft.n_fft) // stft.hop_size + 1
if frames_in_buffer > 0:
print(
f" Chunk {chunks_processed}: {len(chunk)} samples → {frames_in_buffer} frames"
)
for frame_idx in range(frames_in_buffer):
# Compute single frame
frame_data = plan.compute_frame(frame_buffer, frame_idx)
frame_results.append(frame_data)
# Remove processed samples from buffer (keep overlap for next frames)
samples_to_keep = stft.n_fft + (frames_in_buffer - 1) * stft.hop_size
frame_buffer = frame_buffer[len(frame_buffer) - samples_to_keep :]
print("\nStreaming complete:")
print(f" Chunks processed: {chunks_processed}")
print(f" Frames computed: {len(frame_results)}")
print(f" Buffer remaining: {len(frame_buffer)} samples")
# ========================================================================
# Verification
# ========================================================================
print("\n" + "=" * 60)
print("Verification: Streaming vs Batch")
print("=" * 60)
# Convert streaming frames to array
streaming_spec = np.column_stack(frame_results)
print(f"\nBatch spectrogram shape: {batch_spec.data.shape}")
print(f"Streaming spectrogram shape: {streaming_spec.shape}")
# Compare the overlapping portion
n_compare_frames = min(batch_spec.n_frames, streaming_spec.shape[1])
batch_frames = batch_spec.data[:, :n_compare_frames]
streaming_frames = streaming_spec[:, :n_compare_frames]
# Check if results match
matches = np.allclose(batch_frames, streaming_frames, rtol=1e-10)
print(f"\nComparing first {n_compare_frames} frames:")
print(f" Results match: {matches}")
if matches:
print(" Streaming and batch processing produce identical results!")
else:
max_diff = np.max(np.abs(batch_frames - streaming_frames))
print(f" ✗ Maximum difference: {max_diff:.2e}")
# ========================================================================
# Use case: Real-time feature extraction
# ========================================================================
print("\n" + "=" * 60)
print("Use Case: Real-Time Feature Extraction")
print("=" * 60)
print("\nSimulating real-time processing...")
# Process the signal again, but this time extract features from each chunk
frame_buffer = np.array([], dtype=np.float64)
chunk_num = 0
for chunk in simulate_streaming_audio(sample_rate, total_duration, chunk_duration):
chunk_num += 1
frame_buffer = np.concatenate([frame_buffer, chunk])
frames_in_buffer = (len(frame_buffer) - stft.n_fft) // stft.hop_size + 1
if frames_in_buffer > 0:
# Extract features from the latest frame
latest_frame = plan.compute_frame(frame_buffer, frames_in_buffer - 1)
# Example feature: spectral centroid (weighted mean frequency)
frequencies = np.array(batch_spec.frequencies)[: len(latest_frame)]
spectral_centroid = np.sum(frequencies * latest_frame) / (
np.sum(latest_frame) + 1e-10
)
# Example feature: spectral rolloff (95% of energy)
cumsum = np.cumsum(latest_frame)
rolloff_idx = np.where(cumsum >= 0.95 * cumsum[-1])[0][0]
spectral_rolloff = frequencies[rolloff_idx]
print(
f" Chunk {chunk_num}: centroid={spectral_centroid:.1f} Hz, rolloff={spectral_rolloff:.1f} Hz"
)
# Keep necessary samples for next frames
samples_to_keep = stft.n_fft + (frames_in_buffer - 1) * stft.hop_size
frame_buffer = frame_buffer[len(frame_buffer) - samples_to_keep :]
# ========================================================================
# Summary
# ========================================================================
print("\n" + "=" * 60)
print("Summary")
print("=" * 60)
print("\nWhen to use frame-by-frame processing:")
print(" • Real-time audio analysis")
print(" • Online feature extraction")
print(" • Low-latency applications")
print(" • Streaming audio sources")
print(" • Memory-constrained environments")
print("\nWhen to use batch processing:")
print(" • Offline analysis of complete audio files")
print(" • When you need to look ahead/behind in time")
print(" • Simpler code, no buffer management")
print(" • Slightly more efficient for large files")
print("\nPerformance tip:")
print(" • Always use a plan (SpectrogramPlanner) for repeated computations")
print(" • Reuse the plan for all frames in your streaming application")
if __name__ == "__main__":
main()