我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用numpy.complex()。
def cmdct(x, odd=True): """ Calculate complex modified discrete cosine transform of input inefficient pure-Python method. Use only for testing. Parameters ---------- X : array_like The input signal odd : boolean, optional Switch to oddly stacked transform. Defaults to :code:`True`. Returns ------- out : array_like The output signal """ return trans(x, func=lambda x: numpy.cos(x) - 1j * numpy.sin(x), odd=odd)
def icmdct(X, odd=True): """ Calculate inverse complex modified discrete cosine transform of input signal in an inefficient pure-Python method. Use only for testing. Parameters ---------- X : array_like The input signal odd : boolean, optional Switch to oddly stacked transform. Defaults to :code:`True`. Returns ------- out : array_like The output signal """ return itrans(X, func=lambda x: numpy.cos(x) + 1j * numpy.sin(x), odd=odd)
def sphankel1(n, kr): """Spherical Hankel (first kind) of order n at kr Parameters ---------- n : array_like Order kr: array_like Argument Returns ------- hn1 : complex float Spherical Hankel function hn (first kind) """ n, kr = scalar_broadcast_match(n, kr) hn1 = _np.full(n.shape, _np.nan, dtype=_np.complex_) kr_nonzero = kr != 0 hn1[kr_nonzero] = _np.sqrt(_np.pi / 2) / _np.lib.scimath.sqrt(kr[kr_nonzero]) * hankel1(n[kr_nonzero] + 0.5, kr[kr_nonzero]) return hn1
def sphankel2(n, kr): """Spherical Hankel (second kind) of order n at kr Parameters ---------- n : array_like Order kr: array_like Argument Returns ------- hn2 : complex float Spherical Hankel function hn (second kind) """ n, kr = scalar_broadcast_match(n, kr) hn2 = _np.full(n.shape, _np.nan, dtype=_np.complex_) kr_nonzero = kr != 0 hn2[kr_nonzero] = _np.sqrt(_np.pi / 2) / _np.lib.scimath.sqrt(kr[kr_nonzero]) * hankel2(n[kr_nonzero] + 0.5, kr[kr_nonzero]) return hn2
def dspbessel(n, kr): """Derivative of spherical Bessel (first kind) of order n at kr Parameters ---------- n : array_like Order kr: array_like Argument Returns ------- J' : complex float Derivative of spherical Bessel """ return _np.squeeze((n * spbessel(n - 1, kr) - (n + 1) * spbessel(n + 1, kr)) / (2 * n + 1))
def dsphankel1(n, kr): """Derivative spherical Hankel (first kind) of order n at kr Parameters ---------- n : array_like Order kr: array_like Argument Returns ------- dhn1 : complex float Derivative of spherical Hankel function hn' (second kind) """ n, kr = scalar_broadcast_match(n, kr) dhn1 = _np.full(n.shape, _np.nan, dtype=_np.complex_) kr_nonzero = kr != 0 dhn1[kr_nonzero] = 0.5 * (sphankel1(n[kr_nonzero] - 1, kr[kr_nonzero]) - sphankel1(n[kr_nonzero] + 1, kr[kr_nonzero]) - sphankel1(n[kr_nonzero], kr[kr_nonzero]) / kr[kr_nonzero]) return dhn1
def dsphankel2(n, kr): """Derivative spherical Hankel (second kind) of order n at kr Parameters ---------- n : array_like Order kr: array_like Argument Returns ------- dhn2 : complex float Derivative of spherical Hankel function hn' (second kind) """ n, kr = scalar_broadcast_match(n, kr) dhn2 = _np.full(n.shape, _np.nan, dtype=_np.complex_) kr_nonzero = kr != 0 dhn2[kr_nonzero] = 0.5 * (sphankel2(n[kr_nonzero] - 1, kr[kr_nonzero]) - sphankel2(n[kr_nonzero] + 1, kr[kr_nonzero]) - sphankel2(n[kr_nonzero], kr[kr_nonzero]) / kr[kr_nonzero]) return dhn2
def sph_harm_all(nMax, az, el, type='complex'): '''Compute all sphercial harmonic coefficients up to degree nMax. Parameters ---------- nMax : (int) Maximum degree of coefficients to be returned. n >= 0 az: (float), array_like Azimuthal (longitudinal) coordinate [0, 2pi], also called Theta. el : (float), array_like Elevation (colatitudinal) coordinate [0, pi], also called Phi. Returns ------- y_mn : (complex float), array_like Complex spherical harmonics of degrees n [0 ... nMax] and all corresponding orders m [-n ... n], sampled at [az, el]. dim1 corresponds to az/el pairs, dim2 to oder/degree (m, n) pairs like 0/0, -1/1, 0/1, 1/1, -2/2, -1/2 ... ''' m, n = mnArrays(nMax) mA, azA = _np.meshgrid(m, az) nA, elA = _np.meshgrid(n, el) return sph_harm(mA, nA, azA, elA, type=type)
def test_simple_complex(self): # Test native complex input with native double scalar min/max. # Test native input with complex double scalar min/max. a = 3 * self._generate_data_complex(self.nr, self.nc) m = -0.5 M = 1. ac = self.fastclip(a, m, M) act = self.clip(a, m, M) assert_array_strict_equal(ac, act) # Test native input with complex double scalar min/max. a = 3 * self._generate_data(self.nr, self.nc) m = -0.5 + 1.j M = 1. + 2.j ac = self.fastclip(a, m, M) act = self.clip(a, m, M) assert_array_strict_equal(ac, act)
def test_against_cmath(self): import cmath points = [-1-1j, -1+1j, +1-1j, +1+1j] name_map = {'arcsin': 'asin', 'arccos': 'acos', 'arctan': 'atan', 'arcsinh': 'asinh', 'arccosh': 'acosh', 'arctanh': 'atanh'} atol = 4*np.finfo(np.complex).eps for func in self.funcs: fname = func.__name__.split('.')[-1] cname = name_map.get(fname, fname) try: cfunc = getattr(cmath, cname) except AttributeError: continue for p in points: a = complex(func(np.complex_(p))) b = cfunc(p) assert_(abs(a - b) < atol, "%s %s: %s; cmath: %s" % (fname, p, a, b))
def test_complex_nan_comparisons(): nans = [complex(np.nan, 0), complex(0, np.nan), complex(np.nan, np.nan)] fins = [complex(1, 0), complex(-1, 0), complex(0, 1), complex(0, -1), complex(1, 1), complex(-1, -1), complex(0, 0)] with np.errstate(invalid='ignore'): for x in nans + fins: x = np.array([x]) for y in nans + fins: y = np.array([y]) if np.isfinite(x) and np.isfinite(y): continue assert_equal(x < y, False, err_msg="%r < %r" % (x, y)) assert_equal(x > y, False, err_msg="%r > %r" % (x, y)) assert_equal(x <= y, False, err_msg="%r <= %r" % (x, y)) assert_equal(x >= y, False, err_msg="%r >= %r" % (x, y)) assert_equal(x == y, False, err_msg="%r == %r" % (x, y))
def test_basic(self): dt_numeric = np.typecodes['AllFloat'] + np.typecodes['AllInteger'] dt_complex = np.typecodes['Complex'] # test real a = np.eye(3) for dt in dt_numeric + 'O': b = a.astype(dt) res = np.vdot(b, b) assert_(np.isscalar(res)) assert_equal(np.vdot(b, b), 3) # test complex a = np.eye(3) * 1j for dt in dt_complex + 'O': b = a.astype(dt) res = np.vdot(b, b) assert_(np.isscalar(res)) assert_equal(np.vdot(b, b), 3) # test boolean b = np.eye(3, dtype=np.bool) res = np.vdot(b, b) assert_(np.isscalar(res)) assert_equal(np.vdot(b, b), True)
def set_params(self, values): """ Coverts the symbolic values into numerical values. The order of the parameters must be the same as the one given in the class instantiaion. :param list values: a list containing the values for the sympy symbols. :return: None :rtype: None :raises: RuntimeError """ if self._symFClambda != None: self.fc_set( np.array(self._symFClambda(*values), \ dtype=np.complex), \ self._inputType) else: raise RuntimeError("Symbolic FC not defined!")
def test_dipolar_tensor(self): # initial stupid test... ###### TODO : do a reasonable test!!! ###### p = np.array([[0.,0.,0.]]) fc = np.array([[0.,0.,1.]],dtype=np.complex) k = np.array([0.,0.,0.0]) phi= np.array([0.,]) mu = np.array([0.5,0.5,0.5]) sc = np.array([10,10,10],dtype=np.int32) latpar = np.diag([2.,2.,2.]) r = 10. res = lfcext.DipolarTensor(p,mu,sc,latpar,r) np.testing.assert_array_almost_equal(res, np.zeros([3,3])) mu = np.array([0.25,0.25,0.25]) res = lfcext.DipolarTensor(p,mu,sc,latpar,r) np.testing.assert_array_almost_equal(np.trace(res), np.zeros([3])) np.testing.assert_array_almost_equal(res, res.copy().T)
def update_descriptors(self): logger.debug("Updating Plotter %s descriptors based on input descriptor %s", self.name, self.sink.descriptor) self.stream = self.sink.input_streams[0] self.descriptor = self.sink.descriptor try: self.time_pts = self.descriptor.axes[self.descriptor.axis_num("time")].points self.record_length = len(self.time_pts) except ValueError: raise ValueError("Single shot filter sink does not appear to have a time axis!") self.num_segments = len(self.sink.descriptor.axes[self.descriptor.axis_num("segment")].points) self.ground_data = np.zeros((self.record_length, self.num_segments//2), dtype=np.complex) self.excited_data = np.zeros((self.record_length, self.num_segments//2), dtype=np.complex) output_descriptor = DataStreamDescriptor() output_descriptor.axes = [_ for _ in self.descriptor.axes if type(_) is SweepAxis] output_descriptor._exp_src = self.sink.descriptor._exp_src output_descriptor.dtype = np.complex128 for os in self.fidelity.output_streams: os.set_descriptor(output_descriptor) os.end_connector.update_descriptors()
def __init__(self, module, min_delay_ms, autostart=True): self._module = module self._min_delay_ms = min_delay_ms self.current_point = 0 self.current_avg = 0 self.n_points = self._module.points self._paused = True self._fut = None # placeholder for next point future self.never_started = True super(NaCurveFuture, self).__init__() self.data_x = copy(self._module._data_x) # In case of saving latter. self.data_avg = np.empty(self.n_points, dtype=np.complex) self.data_avg.fill(np.nan) self.data_amp = np.empty(self.n_points) self.data_amp.fill(np.nan) # self.start() self._reset_benchmark() self.measured_time_per_point = np.nan # measured over last scan if autostart: self.start()
def threshold_hook(self, current_val): # goes in the module... """ A convenience function to stop the run upon some condition (such as reaching of a threshold. current_val is the complex amplitude of the last data point). To be overwritten in derived class... Parameters ---------- current_val Returns ------- """ pass # Concrete implementation of AcquisitionModule methods and attributes: # --------------------------------------------------------------------
def set_value(self, obj, value): """ the master's setter writes its value to the slave lists """ real, complex = [], [] for v in value: # separate real from complex values if np.imag(v) == 0: real.append(v.real) else: complex.append(v) # avoid calling setup twice with obj.do_setup: setattr(obj, 'complex_' + self.name, complex) setattr(obj, 'real_' + self.name, real) # this property should have call_setup=True, such that obj._setup() # is called automatically after this function
def list_changed(self, module, operation, index, value=None): """ make sure that an element from one of the four lists is selected at once""" if operation == 'select': # unselect all others if not hasattr(module, '_selecting') or not getattr(module, '_selecting'): try: setattr(module, '_selecting', True) for name in [start+'_'+end for start in ['real', 'complex'] for end in ['poles', 'zeros']]: if name != self.name: getattr(module, name).selected = None module._logger.info('%s.selected = None', name) setattr(module, '_selected_pole_or_zero', self.name) setattr(module, '_selected_index', index) finally: setattr(module, '_selecting', False) super(IirFloatListProperty, self).list_changed(module, operation, index, value=value) module._signal_launcher.update_plot.emit() else: super(IirFloatListProperty, self).list_changed(module, operation, index, value=value)
def freqz_(sys, w, dt=8e-9): """ This function computes the frequency response of a zpk system at an array of frequencies. It loosely mimicks 'scipy.signal.frequresp'. Parameters ---------- system: (zeros, poles, k) zeros and poles both in rad/s, k is the actual coefficient, not DC gain w: np.array frequencies in rad/s dt: sampling time Returns ------- np.array(..., dtype=np.complex) with the response """ z, p, k = sys b, a = sig.zpk2tf(z, p, k) _, h = sig.freqz(b, a, worN=w*dt) return h
def tf_inputfilter(self, inputfilter=None, frequencies=None): # input # anti aliasing filter and additional delay model # delay comes from latency in fpga + on average half sampling time moduledelay = self.moduledelay + self.loops / 2.0 if inputfilter is None: inputfilter = self.inputfilter if frequencies is None: frequencies = self.frequencies frequencies = np.asarray(frequencies, dtype=np.complex) try: len(inputfilter) except: inputfilter = [inputfilter] # make it iterable tf = frequencies*0 + 1.0 for f in inputfilter: if f > 0: # lowpass tf /= (1.0 + 1j * frequencies / f) moduledelay += 1 elif f < 0: # highpass tf /= (1.0 + 1j * f / frequencies) moduledelay += 2 # add delay delay = moduledelay * self.dt # + extradelay tf *= np.exp(-1j*delay*frequencies*2*np.pi) return tf
def tf_partialfraction(self, frequencies=None): """ Returns the transfer function just before the partial fraction expansion for frequencies. Parameters ---------- sys: (poles, zeros, k) dt: sampling time continuous: if True, returns the transfer function in continuous time domain, if False converts to discrete one method: method for scipy.signal.cont2discrete alpha: alpha for above method (see scipy documentation) Returns ------- np.array(..., dtype=np.complex) """ # this code is more or less a direct copy of get_coeff() if frequencies is None: frequencies = self.frequencies frequencies = np.asarray(frequencies, dtype=np.complex128) r, p, c = self.rp_continuous h = freqs_rp(r, p, c, frequencies*2*np.pi) return h * self.tf_inputfilter(frequencies=frequencies)
def _filter_transfer_function(cls, frequencies, filter_values, frequency_correction=1.): """ Transfer function of the inputfilter part of a pid module """ frequencies = np.array(frequencies, dtype=np.complex) module_delay = 0 tf = np.ones(len(frequencies), dtype=complex) # input filter modelisation if not isinstance(filter_values, list): filter_values = list([filter_values]) for f in filter_values: if f == 0: continue elif f > 0: # lowpass tf /= (1.0 + 1j * frequencies / f) module_delay += 2 # two cycles extra delay per lowpass elif f < 0: # highpass tf /= (1.0 + 1j * f / frequencies) # plus is correct here since f already has a minus sign module_delay += 1 # one cycle extra delay per highpass delay = module_delay * 8e-9 / frequency_correction tf *= np.exp(-1j * delay * frequencies * 2 * np.pi) return tf
def merge_waveform(n, chAB, chAm1, chAm2, chBm1, chBm2): ''' Builds packed I and Q waveforms from the nth mini LL, merging in marker data. ''' wfAB = np.array([], dtype=np.complex) for entry in chAB['linkList'][n % len(chAB['linkList'])]: if not entry.isTimeAmp: wfAB = np.append(wfAB, chAB['wfLib'][entry.key]) else: wfAB = np.append(wfAB, chAB['wfLib'][entry.key][0] * np.ones(entry.length * entry.repeat)) wfAm1 = marker_waveform(chAm1['linkList'][n % len(chAm1['linkList'])], chAm1['wfLib']) wfAm2 = marker_waveform(chAm2['linkList'][n % len(chAm2['linkList'])], chAm2['wfLib']) wfBm1 = marker_waveform(chBm1['linkList'][n % len(chBm1['linkList'])], chBm1['wfLib']) wfBm2 = marker_waveform(chBm2['linkList'][n % len(chBm2['linkList'])], chBm2['wfLib']) wfA = pack_waveform(np.real(wfAB), wfAm1, wfAm2) wfB = pack_waveform(np.imag(wfAB), wfBm1, wfBm2) return wfA, wfB
def CLEAR(amp=1, length=0, sigma=0, sampling_rate=1e9, **params): """ Pulse shape to quickly deplete the cavity at the end of a measurement. measPulse followed by 2 steps of length step_length and amplitudes amp1, amp2. """ if 'amp1' not in params: params['amp1'] = 0 if 'amp2' not in params: params['amp2'] = 0 if 'step_length' not in params: params['step_length'] = 100e-9 timePts = (1.0 / sampling_rate) * np.arange(np.round((length-2*params['step_length']) * sampling_rate)) flat_step = amp * (0.6 * np.exp(-timePts / sigma) + 0.4).astype(np.complex) numPts_clear_step = int(np.round(params['step_length'] * sampling_rate)) clear_step_one = amp * params['amp1'] * np.ones(numPts_clear_step, dtype=np.complex) clear_step_two = amp * params['amp2'] * np.ones(numPts_clear_step, dtype=np.complex) return np.append(flat_step, [clear_step_one, clear_step_two])
def iqplot(data, spec='.', labels=None): """Plot signal points. :param data: complex baseband signal points :param spec: plot specifier (see :func:`matplotlib.pyplot.plot`) :param labels: label for each signal point >>> import arlpy >>> arlpy.comms.iqplot(arlpy.comms.psk(8)) >>> arlpy.comms.iqplot(arlpy.comms.qam(16), 'rx') >>> arlpy.comms.iqplot(arlpy.comms.psk(4), labels=['00', '01', '11', '10']) """ import matplotlib.pyplot as plt data = _np.asarray(data) if labels is None: plt.plot(data.real, data.imag, spec) else: if labels == True: labels = range(len(data)) for i in range(len(data)): plt.text(data[i].real, data[i].imag, str(labels[i])) plt.axis([-2, 2, -2, 2]) plt.grid() plt.show()
def diff_decode(x): """Decode phase differential baseband signal. :param x: complex baseband differential data to decode :returns: decoded complex baseband data of length len(x)-1 >>> import arlpy >>> d1 = arlpy.comms.random_data(100, 4) >>> qpsk = arlpy.comms.psk(4) >>> x = arlpy.comms.modulate(d1, qpsk) >>> y = arlpy.comms.diff_encode(x) >>> z = arlpy.comms.diff_decode(y) >>> d2 = arlpy.comms.demodulate(z, qpsk) >>> arlpy.comms.ser(d1, d2) 0.0 """ x = _np.asarray(x) y = _np.array(x) y[1:] *= x[:-1].conj() return y[1:]
def pb2bb(x, fs, fc, fd=None, flen=127, cutoff=None): """Convert passband signal to baseband. The baseband conversion uses a low-pass filter after downconversion, with a default cutoff frequency of `0.6*fd`, if `fd` is specified, or `1.1*fc` if `fd` is not specified. Alternatively, the user may specify the cutoff frequency explicitly. For communication applications, one may wish to use :func:`arlpy.comms.downconvert` instead, as that function supports matched filtering with a pulse shape rather than a generic low-pass filter. :param x: passband signal :param fs: sampling rate of passband signal in Hz :param fc: carrier frequency in passband in Hz :param fd: sampling rate of baseband signal in Hz (``None`` => same as `fs`) :param flen: number of taps in the low-pass FIR filter :param cutoff: cutoff frequency in Hz (``None`` means auto-select) :returns: complex baseband signal, sampled at `fd` """ if cutoff is None: cutoff = 0.6*fd if fd is not None else 1.1*fc y = x * _np.sqrt(2)*_np.exp(2j*_np.pi*fc*time(x,fs)) hb = _sig.firwin(flen, cutoff=cutoff, nyq=fs/2.0) y = _sig.filtfilt(hb, 1, y) if fd is not None and fd != fs: y = _sig.resample_poly(y, 2*fd, fs)[::2] return y
def estimate(self, phases, phases_lohi): num_ts, ts_len = np.shape(phases) self.pairs = [(r1, r2) for r1 in xrange(0, num_ts) for r2 in xrange(r1, num_ts)] pacs_ts = np.zeros((num_ts, num_ts, ts_len), dtype=np.complex) pacs_avg = np.zeros((num_ts, num_ts)) for pair in self.pairs: p1, p2 = pair phase1 = phases[p1, ] phase1_lohi = phases_lohi[p2, ] ts, avg = self.estimator.estimate_pair(phase1, phase1_lohi) pacs_ts[pair] = ts pacs_avg[pair] = avg return pacs_ts, pacs_avg
def fill_value(dtype): '''Get a fill-value for a given dtype Parameters ---------- dtype : type Returns ------- `np.nan` if `dtype` is real or complex 0 otherwise ''' if np.issubdtype(dtype, np.float) or np.issubdtype(dtype, np.complex): return dtype(np.nan) return dtype(0)
def mslin2circ(incol, outcol, outms, skipmetadata): tc = pt.table(outms, readonly=False, ack=False) dataXY = tc.getcol(incol) I=numpy.complex(0.0,1.0) dataRL = 0.5* numpy.transpose(numpy.array([ +dataXY[:,:,0]-I*dataXY[:,:,1]+I*dataXY[:,:,2]+dataXY[:,:,3], +dataXY[:,:,0]+I*dataXY[:,:,1]+I*dataXY[:,:,2]-dataXY[:,:,3], +dataXY[:,:,0]-I*dataXY[:,:,1]-I*dataXY[:,:,2]-dataXY[:,:,3], +dataXY[:,:,0]+I*dataXY[:,:,1]-I*dataXY[:,:,2]+dataXY[:,:,3]]), (1,2,0)) tc.putcol(outcol,dataRL) #Change metadata information to be circular feeds if not skipmetadata: feed = pt.table(tc.getkeyword('FEED'),readonly=False,ack=False) for tpart in feed.iter('ANTENNA_ID'): tpart.putcell('POLARIZATION_TYPE',0,['R','L']) polariz = pt.table(tc.getkeyword('POLARIZATION'),readonly=False,ack=False) polariz.putcell('CORR_TYPE',0,[5,6,7,8]) tc.close()
def mscirc2lin(incol, outcol, outms, skipmetadata): tc = pt.table(outms,readonly=False, ack=False) dataRL = tc.getcol(incol) I=numpy.complex(0.0,1.0) dataXY = 0.5* numpy.transpose(numpy.array([ +dataRL[:,:,0]+dataRL[:,:,1]+dataRL[:,:,2]+dataRL[:,:,3], I*(+dataRL[:,:,0]-dataRL[:,:,1]+dataRL[:,:,2]-dataRL[:,:,3]), I*(-dataRL[:,:,0]-dataRL[:,:,1]+dataRL[:,:,2]+dataRL[:,:,3]), +dataRL[:,:,0]-dataRL[:,:,1]-dataRL[:,:,2]+dataRL[:,:,3] ]), (1,2,0)) tc.putcol(outcol,dataXY) #Change metadata information to be circular feeds if not skipmetadata: feed = pt.table(tc.getkeyword('FEED'),readonly=False, ack=False) for tpart in feed.iter('ANTENNA_ID'): tpart.putcell('POLARIZATION_TYPE',0,['X','Y']) polariz = pt.table(tc.getkeyword('POLARIZATION'),readonly=False, ack=False) polariz.putcell('CORR_TYPE',0,[9,10,11,12]) tc.close()
def get_gev_vector(target_psd_matrix, noise_psd_matrix): """ Returns the GEV beamforming vector. :param target_psd_matrix: Target PSD matrix with shape (bins, sensors, sensors) :param noise_psd_matrix: Noise PSD matrix with shape (bins, sensors, sensors) :return: Set of beamforming vectors with shape (bins, sensors) """ bins, sensors, _ = target_psd_matrix.shape beamforming_vector = np.empty((bins, sensors), dtype=np.complex) for f in range(bins): try: eigenvals, eigenvecs = eigh(target_psd_matrix[f, :, :], noise_psd_matrix[f, :, :]) except np.linalg.LinAlgError: eigenvals, eigenvecs = eig(target_psd_matrix[f, :, :], noise_psd_matrix[f, :, :]) beamforming_vector[f, :] = eigenvecs[:, np.argmax(eigenvals)] return beamforming_vector
def apply_sdw_mwf(mix, target_psd_matrix, noise_psd_matrix, mu=1, corr=None): """ Apply speech distortion weighted MWF: h = Tpsd * e1 / (Tpsd + mu*Npsd) :param mix: the signal complex FFT :param target_psd_matrix (bins, sensors, sensors) :param noise_psd_matrix :param mu: the lagrange factor :return """ bins, sensors, frames = mix.shape ref_vector = np.zeros((sensors,1), dtype=np.float) if corr is None: ref_ch = 0 else: # choose the channel with highest correlation with the others corr=corr.tolist() while len(corr) > sensors: corr.remove(np.min(corr)) ref_ch=np.argmax(corr) ref_vector[ref_ch,0]=1 mwf_vector = solve(target_psd_matrix + mu*noise_psd_matrix, target_psd_matrix[:,:,ref_ch]) return np.einsum('...a,...at->...t', mwf_vector.conj(), mix)
def get_gevd_vals_vecs(target_psd_matrix, noise_psd_matrix): """ Returns the eigenvalues and eigenvectors of GEVD :param target_psd_matrix: Target PSD matrix with shape (bins, sensors, sensors) :param noise_psd_matrix: Noise PSD matrix with shape (bins, sensors, sensors) :return: Set of eigen values with shape (bins, sensors) eigenvectors (bins, sensors, sensors) """ bins, sensors, _ = target_psd_matrix.shape eigen_values = np.empty((bins, sensors), dtype=np.complex) eigen_vectors = np.empty((bins, sensors, sensors), dtype=np.complex) for f in range(bins): try: eigenvals, eigenvecs = eigh(target_psd_matrix[f, :, :], noise_psd_matrix[f, :, :]) except np.linalg.LinAlgError: eigenvals, eigenvecs = eig(target_psd_matrix[f, :, :], noise_psd_matrix[f, :, :]) # values in increasing order eigen_values[f,:] = eigenvals eigen_vectors[f, :] = eigenvecs return eigen_values, eigen_vectors
def test_simple_complex(self): #Test native complex input with native double scalar min/max. #Test native input with complex double scalar min/max. a = 3 * self._generate_data_complex(self.nr, self.nc) m = -0.5 M = 1. ac = self.fastclip(a, m, M) act = self.clip(a, m, M) assert_array_strict_equal(ac, act) #Test native input with complex double scalar min/max. a = 3 * self._generate_data(self.nr, self.nc) m = -0.5 + 1.j M = 1. + 2.j ac = self.fastclip(a, m, M) act = self.clip(a, m, M) assert_array_strict_equal(ac, act)
def _queryDefaults(self, component): defaults = {"boolean" : component.complexBooleanProp, "ulong" : component.complexULongProp, "short" : component.complexShortProp, "float" : component.complexFloatProp, "octet" : component.complexOctetProp, "ushort" : component.complexUShort, "double" : component.complexDouble, "long" : component.complexLong, "longlong" : component.complexLongLong, "ulonglong" : component.complexULongLong} # TODO: char #"char" : component.complexCharProp, #"char" : numpy.complex(0,1), return defaults
def spbessel(n, kr): """Spherical Bessel function (first kind) of order n at kr Parameters ---------- n : array_like Order kr: array_like Argument Returns ------- J : complex float Spherical Bessel """ n, kr = scalar_broadcast_match(n, kr) if _np.any(n < 0) | _np.any(kr < 0) | _np.any(_np.mod(n, 1) != 0): J = _np.zeros(kr.shape, dtype=_np.complex_) kr_non_zero = kr != 0 J[kr_non_zero] = _np.lib.scimath.sqrt(_np.pi / 2 / kr[kr_non_zero]) * besselj(n[kr_non_zero] + 0.5, kr[kr_non_zero]) J[_np.logical_and(kr == 0, n == 0)] = 1 else: J = scy.spherical_jn(n.astype(_np.int), kr) return _np.squeeze(J)
def spneumann(n, kr): """Spherical Neumann (Bessel second kind) of order n at kr Parameters ---------- n : array_like Order kr: array_like Argument Returns ------- Yv : complex float Spherical Neumann (Bessel second kind) """ n, kr = scalar_broadcast_match(n, kr) if _np.any(n < 0) | _np.any(_np.mod(n, 1) != 0): Yv = _np.full(kr.shape, _np.nan, dtype=_np.complex_) kr_non_zero = kr != 0 Yv[kr_non_zero] = _np.lib.scimath.sqrt(_np.pi / 2 / kr[kr_non_zero]) * neumann(n[kr_non_zero] + 0.5, kr[kr_non_zero]) Yv[kr < 0] = -Yv[kr < 0] else: Yv = scy.spherical_yn(n.astype(_np.int), kr) Yv[_np.isinf(Yv)] = _np.nan # return possible infs as nan to stay consistent return _np.squeeze(Yv)
def spherical_extrapolation(order, array_configuration, k_mic, k_scatter=None, k_dual=None): """ Factor that relate signals recorded on a sphere to it's center. Parameters ---------- order : int Order array_configuration : ArrayConfiguration List/Tuple/ArrayConfiguration, see io.ArrayConfiguration k_mic : array_like K vector for microphone array k_scatter: array_like, optional K vector for scatterer (Default: same as k_mic) Returns ------- b : array, complex """ array_configuration = ArrayConfiguration(*array_configuration) if array_configuration.array_type is 'open': if array_configuration.transducer_type is 'omni': return bn_open_omni(order, k_mic) elif array_configuration.transducer_type is 'cardioid': return bn_open_cardioid(order, k_mic) elif array_configuration.array_type is 'rigid': if array_configuration.transducer_type is 'omni': return bn_rigid_omni(order, k_mic, k_scatter) elif array_configuration.transducer_type is 'cardioid': return bn_rigid_cardioid(order, k_mic, k_scatter) elif array_configuration.array_type is 'dual': return bn_dual_open_omni(order, k_mic, k_dual)
def sph_harm(m, n, az, el, type='complex'): '''Compute sphercial harmonics Parameters ---------- m : (int) Order of the spherical harmonic. abs(m) <= n n : (int) Degree of the harmonic, sometimes called l. n >= 0 az: (float) Azimuthal (longitudinal) coordinate [0, 2pi], also called Theta. el : (float) Elevation (colatitudinal) coordinate [0, pi], also called Phi. Returns ------- y_mn : (complex float) Complex spherical harmonic of order m and degree n, sampled at theta = az, phi = el ''' if type == 'legacy': return scy.sph_harm(m, n, az, el) elif type == 'real': Lnm = scy.lpmv(_np.abs(m), n, _np.cos(el)) factor_1 = (2 * n + 1) / (4 * _np.pi) factor_2 = scy.factorial(n - _np.abs(m)) / scy.factorial(n + abs(m)) if m != 0: factor_1 = 2 * factor_1 if m < 0: return (-1) ** m * _np.sqrt(factor_1 * factor_2) * Lnm * _np.sin(m * az) else: return (-1) ** m * _np.sqrt(factor_1 * factor_2) * Lnm * _np.cos(m * az) else: # For the correct Condon–Shortley phase, all m>0 need to be increased by 1 return (-1) ** _np.float_(m - (m < 0) * (m % 2)) * scy.sph_harm(m, n, az, el)
def _solve_lens_equation(self, complex_value): """ Solve the lens equation for the given point (in complex coordinates). """ complex_conjugate = np.conjugate(complex_value) return complex_value - (1. / (1. + self.q)) * ( (1./complex_conjugate) + (self.q / (complex_conjugate - self.s)))
def make_filterbank(self): erb_max = hz2erb(self.sr/2.0) erb_freqs = np.arange(0, self.n_bins) * erb_max / float(self.n_bins - 1) self.hz_freqs = erb2hz(erb_freqs) self.widths = np.round(0.5 * (self.n_bins - 1) / erb_max * 9.26 * 0.00437 * self.sr * np.exp(-erb_freqs / 9.26) - 0.5) self.filters = [] for b in range(self.n_bins): w = self.widths[b] f = self.hz_freqs[b] exponential = np.exp( np.complex(0,1) * 2 * np.pi * f / self.sr * np.arange(-w, w + 1)) self.filters.append(np.hanning(2 * w + 1) * exponential)