51 lines
		
	
	
		
			1.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			51 lines
		
	
	
		
			1.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import numpy as np
 | 
						|
from scipy import interpolate
 | 
						|
 | 
						|
def Singleton(cls):
 | 
						|
    _instance = {}
 | 
						|
 
 | 
						|
    def _singleton(*args, **kargs):
 | 
						|
        if cls not in _instance:
 | 
						|
            _instance[cls] = cls(*args, **kargs)
 | 
						|
        return _instance[cls]
 | 
						|
 
 | 
						|
    return _singleton
 | 
						|
 | 
						|
 | 
						|
@Singleton
 | 
						|
class RealtimeAudioDistribution():
 | 
						|
    def __init__(self) -> None:
 | 
						|
        self.data = {}
 | 
						|
        self.max_len = 1024*1024
 | 
						|
        self.rate = 48000   # 只读,每秒采样数量
 | 
						|
 | 
						|
    def clean_up(self):
 | 
						|
        self.data = {}
 | 
						|
 | 
						|
    def feed(self, uuid, audio):
 | 
						|
        self.rate, audio_ = audio
 | 
						|
        # print('feed', len(audio_), audio_[-25:])
 | 
						|
        if uuid not in self.data:
 | 
						|
            self.data[uuid] = audio_
 | 
						|
        else:
 | 
						|
            new_arr = np.concatenate((self.data[uuid], audio_))
 | 
						|
            if len(new_arr) > self.max_len: new_arr = new_arr[-self.max_len:]
 | 
						|
            self.data[uuid] = new_arr
 | 
						|
 | 
						|
    def read(self, uuid):
 | 
						|
        if uuid in self.data:
 | 
						|
            res = self.data.pop(uuid)
 | 
						|
            print('\r read-', len(res), '-', max(res), end='', flush=True)
 | 
						|
        else:
 | 
						|
            res = None
 | 
						|
        return res
 | 
						|
    
 | 
						|
def change_sample_rate(audio, old_sr, new_sr):
 | 
						|
    duration = audio.shape[0] / old_sr
 | 
						|
 | 
						|
    time_old  = np.linspace(0, duration, audio.shape[0])
 | 
						|
    time_new  = np.linspace(0, duration, int(audio.shape[0] * new_sr / old_sr))
 | 
						|
 | 
						|
    interpolator = interpolate.interp1d(time_old, audio.T)
 | 
						|
    new_audio = interpolator(time_new).T
 | 
						|
    return new_audio.astype(np.int16) |