arduino_midi_player/Midi/music-box-pc-master/rtaudio/contrib/python/pyrtaudio/PyRtAudioTest.py
2025-03-24 14:30:56 +08:00

107 lines
2.9 KiB
Python

from __future__ import print_function
import threading
import rtaudio as rt
from math import cos
import struct
class audio_generator:
def __init__(self):
self.idx = -1
self.freq = 440.
def __call__(self):
self.idx += 1
if self.idx%48000 == 0:
self.freq *= 2**(1/12.)
return 0.5*cos(2.*3.1416*self.freq*self.idx/48000.)
class callback:
def __init__(self, gen):
self.gen = gen
self.i = 0
def __call__(self,playback, capture):
[struct.pack_into("f", playback, 4*o, self.gen()) for o in range(256)]
self.i = self.i + 256
if self.i > 48000*10:
print('.')
return 1
try:
# if we have numpy, replace the above class
import numpy as np
class callback:
def __init__(self, gen):
print('Using Numpy.')
self.freq = 440.
t = np.arange(256, dtype=np.float32) / 48000.0
self.phase = 2*np.pi*t
self.inc = 2*np.pi*256/48000
self.k = 0
def __call__(self, playback, capture):
# Calculate sinusoid using numpy vector operation, as
# opposed to per-sample computations in the generator
# above that must be collected and packed one at a time.
self.k += 256
if self.k > 48000:
self.freq *= 2**(1/12.)
self.k = 0
self.phase += self.inc
samples = 0.5*np.cos(self.phase * self.freq)
# Ensure result is the right size!
assert samples.shape[0] == 256
assert samples.dtype == np.float32
# Use numpy array view to do a once-copy into memoryview
# (ie. we only do a single byte-wise copy of the final
# result into 'playback')
usamples = samples.view(dtype=np.uint8)
playback_array = np.array(playback, copy=False)
np.copyto(playback_array, usamples)
except ModuleNotFoundError:
print('Numpy not available, using struct.')
dac = rt.RtAudio()
n = dac.getDeviceCount()
print('Number of devices available: ', n)
for i in range(n):
try:
print(dac.getDeviceInfo(i))
except rt.RtError as e:
print(e)
print('Default output device: ', dac.getDefaultOutputDevice())
print('Default input device: ', dac.getDefaultInputDevice())
print('is stream open: ', dac.isStreamOpen())
print('is stream running: ', dac.isStreamRunning())
oParams = {'deviceId': 0, 'nChannels': 1, 'firstChannel': 0}
iParams = {'deviceId': 0, 'nChannels': 1, 'firstChannel': 0}
try:
dac.openStream(oParams,oParams,48000,256,callback(audio_generator()) )
except rt.RtError as e:
print(e)
else:
dac.startStream()
import time
print('latency: ', dac.getStreamLatency())
while (dac.isStreamRunning()):
time.sleep(0.1)
print(dac.getStreamTime())
dac.stopStream()
dac.abortStream()
dac.closeStream()