我们从Python开源项目中,提取了以下25个代码示例,用于说明如何使用scipy.signal.resample()。
def downsample_utterance(features, seglist, n): """ Return the downsampled matrix with each row an embedding for a segment in the seglist. """ embeddings = [] for i, j in seglist: y = features[i:j+1, :].T y_new = signal.resample(y, n, axis=1).flatten("C") embeddings.append(y_new) return np.asarray(embeddings) #-----------------------------------------------------------------------------# # MAIN FUNCTION # #-----------------------------------------------------------------------------#
def check_argv(): """Check the command line arguments.""" parser = argparse.ArgumentParser(description=__doc__.strip().split("\n")[0], add_help=False) parser.add_argument("input_npz_fn", type=str, help="input speech file") parser.add_argument("output_npz_fn", type=str, help="output embeddings file") parser.add_argument("n", type=int, help="number of samples") parser.add_argument("--technique", choices=["interpolate", "resample", "rasanen"], default="resample") parser.add_argument( "--frame_dims", type=int, default=None, help="only keep these number of dimensions" ) if len(sys.argv) == 1: parser.print_help() sys.exit(1) return parser.parse_args() #-----------------------------------------------------------------------------# # MAIN FUNCTION # #-----------------------------------------------------------------------------#
def get_intervals(pcg, transitions=None, interval='RR', resize=None): """ Given array transitions and a interval type computes an array of starting indices and ending indices for that given type of interval. Args: pcg : numpy array transitions : tuple( numpy array begin, numpy array end) begin - indices where the intervals start end - indices where the intervals end interval : string Type of interval [RR, S1, Sys, S2, Dia]. Defaults to RR resize : int resample the interval to a specified length Returns: intervals : list<numpy array> list of intervals of the specified type """ intervals = [pcg[i:j] for i, j in zip(*boundaries(transitions, interval))] if resize: intervals = [resample(i, resize) for i in intervals] return intervals
def _resample_rdp(self, n): """ Resample Radial Density Profile :param n: int :return: nothing """ columns = [] for i in xrange(n): columns += ['edp_{}'.format(i)] self.feature_names += columns # edp = [None] * len(self._df) edp = np.zeros([len(self._df), n]) for i, v in enumerate(self._df.RadialDensityProfile.values): x = np.zeros([len(v)]) for j, v_ in enumerate(v): x[j] = v_[0] edp[i] = signal.resample(x, n) df = pd.DataFrame(data=edp, columns=columns, index=self._df.index) self._df = pd.concat([self._df, df], axis=1)
def _resample_edp(self, n): """ Resample Edge Density Profile :param n: int :return: nothing """ columns = [] for i in xrange(n): columns += ['edp_{}'.format(i)] self.feature_names += columns # edp = [None] * len(self._df) rdp = np.zeros([len(self._df), n]) for i, v in enumerate(self._df.EdgeDensityProfile.values): x = np.zeros([len(v)]) for j, v_ in enumerate(v): x[j] = v_[0] rdp[i] = signal.resample(x, n) df = pd.DataFrame(data=rdp, columns=columns, index=self._df.index) self._df = pd.concat([self._df, df], axis=1)
def simple_resample(data, original_fs, target_fs): """Wrap scipy.signal.resample with a simpler API """ return resample(data, num=int(data.shape[1] * target_fs / original_fs), axis=1)
def get_name(self): return "resample%d" % self.f
def apply(self, data): axis = data.ndim - 1 if data.shape[-1] > self.f: return resample(data, self.f, axis=axis) return data
def get_name(self): return "resample%dhanning" % self.f
def apply(self, data): axis = data.ndim - 1 out = resample(data, self.f, axis=axis, window=hann(M=data.shape[axis])) return out
def DTW_features(pcg, transitions, interval='RR', constraint='sakoe_chiba', k=0.1, norm="resample", pre=None, downsample_rate=2, suf=None, sigma=None): k = float(k) if sigma is not None: sigma = float(sigma) pcg = dtw_preprocess(pcg, pre=pre) resize = ALENGTH[interval] if norm == 'resample' else None intervals = get_intervals(pcg, transitions, interval=interval, resize=resize) intervals = [resample(i, len(i) // downsample_rate) for i in intervals] if norm not in ['path', 'resample']: raise ValueError("Invalid normalization {0}".format(norm)) path_norm = norm == 'path' dist_matrix = dtw_distances(intervals, n_jobs=-1, constraint=constraint, k=k, path_norm=path_norm, normalize=True) dist_matrix = finite_matrix(dist_matrix) medoid_index = np.argmin(np.sum(dist_matrix, axis=0)) # Remove the infinite distances if sigma: dist_matrix = -affinity(dist_matrix, sigma) medoid_distances = dist_matrix[:, medoid_index] medoid_distances = medoid_distances[np.isfinite(medoid_distances)] m_MDTW, s_MDTW = _mean_std(medoid_distances) Q1_MDTW, Q2_MDTW, Q3_MDTW = np.percentile(medoid_distances, [25, 50, 75]) contiguous_distances = np.array([dist_matrix[i, i + 1] for i in np.arange(len(dist_matrix) - 1)]) contiguous_distances = contiguous_distances[np.isfinite(contiguous_distances)] Q1_CDTW, Q2_CDTW, Q3_CDTW = np.percentile(contiguous_distances, [25, 50, 75]) features = np.array([m_MDTW, s_MDTW, Q1_MDTW, Q2_MDTW, Q3_MDTW, Q1_CDTW, Q2_CDTW, Q3_CDTW]) return features
def cinter_DTW_features(pcg, transitions, templates='random_templates', pre=None, constraint='sakoe_chiba', k=0.1, suf=None, sigma=None): k = float(k) if sigma is not None: sigma = float(sigma) templates = custom_loadmat(TEMPLATE_FOLDER + templates) inter_DTW_features.names = ["%s_d%02d" % (q, i) for q in ['mean', 'std'] for i, _ in enumerate(templates)] pcg = dtw_preprocess(pcg, pre=pre) distances = [] for interval in ['RR', 'S1', 'Sys', 'S2', 'Dia']: templates_i = templates[interval] intervals = get_intervals(pcg, transitions, interval=interval, resize=ALENGTH[interval] // interDTW_down) intervals = [resample(i, ALENGTH[interval] // interDTW_down) for i in intervals][:50] dist_matrix = dtw_distances(intervals, templates_i, n_jobs=-1, constraint=constraint, k=k) dist_matrix = finite_matrix(dist_matrix) if sigma: dist_matrix = -affinity(dist_matrix, sigma) distances.append(dist_matrix) RR_mean, S1_mean, Sys_mean, S2_mean, Dia_mean = [np.mean(d) for d in distances] RR_std, S1_std, Sys_std, S2_std, Dia_std = [np.std(d) for d in distances] features = [RR_mean, S1_mean, Sys_mean, S2_mean, Dia_mean, RR_std, S1_std, Sys_std, S2_std, Dia_std] return features
def preprocess_sample(aud_sample,rate): # Step 0: Pre-process the speech sample # a. Down-sample to 8 MHz (should be enough for Autism detection - only human speech) # b. Normalization [Apply gain s.t the sample data is in the range [-1.0, 1.0] # c. Noise Cancellation proc_sample = signal.resample(aud_sample, len(aud_sample)*SAMPLING_RATE/rate) if np.max(proc_sample) > 1.0: proc_sample = proc_sample*1.0/pow(2, 15) proc_sample = noise_removal(proc_sample) return proc_sample
def resample_gnuradio(file: str, ratio: float): """ ratio is current_fs / desired_fs """ f = load_gnuradio_file(file) resampled = signal.resample(f, len(f) * ratio) return resampled
def XsSeg2XaePhon(Xs, Xs_mask, segs, maxLen, nResample=None): Xae = np.split(Xs, len(Xs)) FRAME_SIZE = Xs.shape[-1] deletedChars = [] oneLetter = [] Xae_phon = [] for i, utt in enumerate(Xae): utt = np.squeeze(utt, 0)[np.logical_not(Xs_mask[i])] utt = np.split(utt, np.where(segs[i, :len(utt)])[0]) if len((utt[0])) == 0: utt.pop(0) for j in range(len(utt)): w_len = min(len(utt[j]), maxLen) w_target = np.zeros((nResample if nResample else maxLen, FRAME_SIZE)) deletedChars.append(max(0, len(utt[j]) - maxLen)) oneLetter.append(int(w_len == 1)) if nResample: if w_len > 1: word = resample(utt[j][:w_len], nResample) else: word = np.repeat(utt[j][:w_len], nResample, axis=0) w_len = maxLen else: word = utt[j][:w_len] w_target[-w_len:] = word Xae_phon.append(w_target) Xae_phon = np.stack(Xae_phon) deletedChars = np.array(deletedChars) oneLetter = np.array(oneLetter) return Xae_phon, deletedChars, oneLetter
def f0transform(self, x, completion=False): """Transform F0 of given waveform signals using Parameters --------- x : array, shape ('len(x)') array of waveform sequence completion : bool, optional Completion of high frequency range of F0 transformed wavform based on unvoiced analysis/synthesis voice of given voice and high-pass filter. This is due to loose the high frequency range caused by resampling when F0ratio setting to smaller than 1.0. Returns --------- transformed : array, shape (`len(x)`) Array of F0 transformed waveform sequence """ self.xlen = len(x) # WSOLA wsolaed = self.wsola.duration_modification(x) # resampling transformed = resample(wsolaed, self.xlen) # Frequency completion when decrease F0 of wavform if completion: if self.f0rate > 1.0: raise ValueError("Do not enable completion if f0rate > 1.") transformed = self._high_frequency_completion(x, transformed) return transformed
def resampleToLengthN(v, n): # can't guarantee length = exactly n if len(v.shape) == 1: factor = int(len(v) / float(n)) if factor <= 1: return v # return downsampleBy(v, factor) return signal.resample(v, n) elif len(v.shape) == 2: # resample each row rows = map(lambda row: resampleToLengthN(row.flatten(), n), v) return np.vstack(rows) else: print("Error: can't resample tensor to length N!")
def warpedSeq(seq, sameLength=True, useI=True, **kwargs): path = randWarpingPath(len(seq), **kwargs) # list of (i,j) pairs idxs_i, idxs_j = zip(*path) # tuple of i vals, tuple of j vals idxs = idxs_i if useI else idxs_j # use i idxs or j idxs warped = seq[np.asarray(idxs)] if sameLength: warped = signal.resample(warped, len(seq)) return warped
def createAdversarialSignal(startIdxs, endIdxs, signalLength): assert(len(startIdxs) == len(endIdxs)) numInstances = len(startIdxs) # avgLength = pairs = np.arange(numInstances) if len(pairs) % 2 != 0: # handle odd numbers of instances pairs = np.append(pairs, np.random.choice(pairs)) # duplicate some idx np.random.shuffle(pairs) pairs = pairs.reshape((-1, 2)) # random out = np.zeros(signalLength) for pair in pairs: start1, end1 = startIdxs[pair[0]], endIdxs[pair[0]] start2, end2 = startIdxs[pair[1]], endIdxs[pair[1]] length1 = end1 - start1 # end idxs not inclusive length2 = end2 - start2 subseq = randwalk(length1) negSeq = -subseq if length1 != length2: negSeq = signal.resample(negSeq, length2) out[start1:end1] = subseq out[start2:end2] = negSeq return out # ================================================================ # Create particular synthetic datasets (for prototyping / smoke testing) # ================================================================
def main(): args = check_argv() print "Reading:", args.input_npz_fn input_npz = np.load(args.input_npz_fn) d_frame = input_npz[input_npz.keys()[0]].shape[1] print "Frame dimensionality:", d_frame if args.frame_dims is not None and args.frame_dims < d_frame: d_frame = args.frame_dims print "Reducing frame dimensionality:", d_frame print "Downsampling:", args.technique output_npz = {} for key in input_npz: # Limit input dimensionailty y = input_npz[key][:, :args.frame_dims].T # Downsample if args.technique == "interpolate": x = np.arange(y.shape[1]) f = interpolate.interp1d(x, y, kind="linear") x_new = np.linspace(0, y.shape[1] - 1, args.n) y_new = f(x_new).flatten(flatten_order) #.flatten("F") elif args.technique == "resample": y_new = signal.resample(y, args.n, axis=1).flatten(flatten_order) #.flatten("F") elif args.technique == "rasanen": # Taken from Rasenen et al., Interspeech, 2015 n_frames_in_multiple = int(np.floor(y.shape[1] / args.n)) * args.n y_new = np.mean( y[:, :n_frames_in_multiple].reshape((d_frame, args.n, -1)), axis=-1 ).flatten(flatten_order) #.flatten("F") # This was done in Rasenen et al., Interspeech, 2015, but didn't help here # last_term = args.n/3. * np.log10(y.shape[1] * 10e-3) # Not sure if this should be in frames or ms # y_new = np.hstack([y_new, last_term]) # Save result output_npz[key] = y_new print "Output dimensionality:", output_npz[output_npz.keys()[0]].shape[0] print "Writing:", args.output_npz_fn np.savez_compressed(args.output_npz_fn, **output_npz)
def sinusoid_analysis(X, input_sample_rate, resample_block=128, copy=True): """ Contruct a sinusoidal model for the input signal. Parameters ---------- X : ndarray Input signal to model input_sample_rate : int The sample rate of the input signal resample_block : int, optional (default=128) Controls the step size of the sinusoidal model Returns ------- frequencies_hz : ndarray Frequencies for the sinusoids, in Hz. magnitudes : ndarray Magnitudes of sinusoids returned in ``frequencies`` References ---------- D. P. W. Ellis (2004), "Sinewave Speech Analysis/Synthesis in Matlab", Web resource, available: http://www.ee.columbia.edu/ln/labrosa/matlab/sws/ """ X = np.array(X, copy=copy) resample_to = 8000 if input_sample_rate != resample_to: if input_sample_rate % resample_to != 0: raise ValueError("Input sample rate must be a multiple of 8k!") # Should be able to use resample... ? # resampled_count = round(len(X) * resample_to / input_sample_rate) # X = sg.resample(X, resampled_count, window=sg.hanning(len(X))) X = sg.decimate(X, input_sample_rate // resample_to, zero_phase=True) step_size = 2 * round(resample_block / input_sample_rate * resample_to / 2.) a, g, e = lpc_analysis(X, order=8, window_step=step_size, window_size=2 * step_size) f, m = lpc_to_frequency(a, g) f_hz = f * resample_to / (2 * np.pi) return f_hz, m
def XsSeg2Xae(Xs, Xs_mask, segs, maxUtt, maxLen, nResample=None, check_output=False): Xae = np.split(Xs, len(Xs)) FRAME_SIZE = Xs.shape[-1] deletedChars = np.zeros((len(Xae), maxUtt)) oneLetter = np.zeros((len(Xae), maxUtt)) for i,utt in enumerate(Xae): utt_target = np.zeros((maxUtt, nResample if nResample else maxLen, FRAME_SIZE)) utt = np.squeeze(utt, 0)[np.logical_not(Xs_mask[i])] utt = np.split(utt, np.where(segs[i,:len(utt)])[0]) if len((utt[0])) == 0: utt.pop(0) n_words = min(len(utt), maxUtt) padwords = maxUtt - n_words for j in range(n_words): w_len = min(len(utt[j]), maxLen) w_target = np.zeros((nResample if nResample else maxLen, FRAME_SIZE)) deletedChars[i,padwords+j] += max(0, len(utt[j]) - maxLen) oneLetter[i,padwords+j] += int(w_len == 1) if nResample: if w_len > 1: word = resample(utt[j][:w_len], nResample) else: word = np.repeat(utt[j][:w_len], nResample, axis=0) w_len = maxLen else: word = utt[j][:w_len] w_target[-w_len:] = word utt[j] = w_target utt_target[padwords+j] = utt[j] extraWDel = 0 for j in range(maxUtt, len(utt)): extraWDel += len(utt[j]) ## Uniformly distribute clipping penaresh2lty for excess words deletedChars[i,:] += float(extraWDel) / maxUtt Xae[i] = utt_target Xae = np.stack(Xae) ## NOTE: Reconstitution will fail if there has been any clipping. ## Do not use this feature unless maxutt and maxlen are large enough ## to make clipping very unlikely. ## Currently only works in acoustic mode. if check_output: for i in range(len(Xs)): src = Xs[i][np.logical_not(Xs_mask[i])] target = Xae[i] reconstituted = np.zeros((0,FRAME_SIZE)) for wi in range(maxUtt): w = target[wi][np.where(target[wi].any(-1))] reconstituted = np.concatenate([reconstituted, w]) for j in range(len(src)): assert np.allclose(src[j], reconstituted[j]), \ '''Reconstitution of MFCC frames failed at timestep %d. Source region: %s\n Reconstituted region: %s''' \ %(j, src[j-1:j+2], reconstituted[j-1:j+2]) return Xae, deletedChars, oneLetter
def demodulate_array(h, soft_lcd): # primary worker function, credit to all i = h[1:] * np.conj(h[:-1]) j = np.angle(i) k = signal.convolve(j, basebandBP) # resample from 256kHz to 228kHz rdsBand = signal.resample(k, int(len(k)*228e3/256e3)) # length modulo 4 rdsBand = rdsBand[:(len(rdsBand)//4)*4] c57 = numpy.tile( [1., -1.], len(rdsBand)//4 ) xi = rdsBand[::2] * c57 xq = rdsBand[1::2] * (-c57) xfi = signal.convolve(xi, filtLP) xfq = signal.convolve(xq, filtLP) xsfi = signal.convolve(xfi, pulseFilt) xsfq = signal.convolve(xfq, pulseFilt) if len(xsfi) % 2 == 1: xsfi = xsfi[:-1] xsfq = xsfq[:-1] xdi = (xsfi[::2] + xsfi[1::2]) / 2 xdq = xsfq[::2] res = symbol_recovery_24(xdi, xdq) hits = [] for i in range(len(res)-26): h = rds_crc(res, i, 26) if h: hits.append( (i, h) ) print(res,hits) packets = [] print([decode_one(res, x[0]) for x in hits if x[1] == 'A']) for i in range(len(hits)-3): if hits[i][1] == "A": bogus = False for j,sp in enumerate("ABCD"): if 26*j != hits[i+j][0] - hits[i][0]: bogus = True if hits[i+j][1] != sp: bogus = True if not bogus: for j in range(4): packets.append(decode_one(res, hits[i+j][0])) soft_lcd.update_state(packets)