我们从Python开源项目中,提取了以下42个代码示例,用于说明如何使用numpy.nanargmax()。
def guess(representation, sims, xi, a, a_, b): sa = sims[xi[a]] sa_ = sims[xi[a_]] sb = sims[xi[b]] add_sim = -sa+sa_+sb if a in representation.wi: add_sim[representation.wi[a]] = 0 if a_ in representation.wi: add_sim[representation.wi[a_]] = 0 if b in representation.wi: add_sim[representation.wi[b]] = 0 b_add = representation.iw[np.nanargmax(add_sim)] mul_sim = sa_*sb*np.reciprocal(sa+0.01) if a in representation.wi: mul_sim[representation.wi[a]] = 0 if a_ in representation.wi: mul_sim[representation.wi[a_]] = 0 if b in representation.wi: mul_sim[representation.wi[b]] = 0 b_mul = representation.iw[np.nanargmax(mul_sim)] return b_add, b_mul
def test_nanargmax(self): tgt = np.argmax(self.mat) for mat in self.integer_arrays(): assert_equal(np.nanargmax(mat), tgt)
def generalized_esd(x, r, alpha=0.05, method='mean'): """Generalized ESD test for outliers (http://www.itl.nist.gov/div898/handbook/eda/section3/eda35h3.htm). Args: x (numpy.ndarray): the data r (int): max number of outliers alpha (float): the signifiance level method (str): 'median' or 'mean' Returns: list[int]: list of the index of outliers """ x = np.asarray(x, dtype=np.float64) fn = __get_pd_median if method == 'median' else __get_pd_mean NaN = float('nan') outliers = [] N = len(x) for i in range(1, r + 1): if np.any(~np.isnan(x)): m, e = fn(x) if e != 0.: y = np.abs(x - m) j = np.nanargmax(y) R = y[j] lam = __get_lambda_critical(N, i, alpha) if R > lam * e: outliers.append(j) x[j] = NaN else: break else: break else: break return outliers
def _select_best_score(scores, args): return np.nanargmax(np.array(scores))
def _select_best_measure_index(curr_measures, args): idx = None try: if args.measure == 'aicc': # The best score for AICc is the minimum. idx = np.nanargmin(curr_measures) elif args.measure in ['hmm-distance', 'wasserstein', 'mahalanobis']: # The best score for the l-d measure is the maximum. idx = np.nanargmax(curr_measures) except: idx = random.choice(range(len(curr_measures))) assert idx is not None return idx
def choose_arm(x, experts, explore): n_arms = len(experts) # make predictions preds = [expert.predict(x) for expert in experts] # get best arm arm_max = np.nanargmax(preds) # create arm selection probabilities P = [(1-explore)*(arm==arm_max) + explore/n_arms for arm in range(n_arms)] # select an arm chosen_arm = np.random.choice(np.arange(n_arms), p=P) pred = preds[chosen_arm] return chosen_arm, pred
def predict_ana( model, a, a2, b, realb2 ): questWordIndices = [ model.word2id[x] for x in (a,a2,b) ] # b2 is effectively iterating through the vocab. The row is all the cosine values b2a2 = model.sim_row(a2) b2a = model.sim_row(a) b2b = model.sim_row(b) addsims = b2a2 - b2a + b2b addsims[questWordIndices] = -10000 iadd = np.nanargmax(addsims) b2add = model.vocab[iadd] # For debugging purposes ia = model.word2id[a] ia2 = model.word2id[a2] ib = model.word2id[b] ib2 = model.word2id[realb2] realaddsim = addsims[ib2] mulsims = ( b2a2 + 1 ) * ( b2b + 1 ) / ( b2a + 1.001 ) mulsims[questWordIndices] = -10000 imul = np.nanargmax(mulsims) b2mul = model.vocab[imul] return b2add, b2mul
def decode_location(likelihood, pos_centers, time_centers): """Finds the decoded location based on the centers of the position bins. Parameters ---------- likelihood : np.array With shape(n_timebins, n_positionbins) pos_centers : np.array time_centers : np.array Returns ------- decoded : nept.Position Estimate of decoded position. """ prob_rows = np.sum(np.isnan(likelihood), axis=1) < likelihood.shape[1] max_decoded_idx = np.nanargmax(likelihood[prob_rows], axis=1) prob_decoded = pos_centers[max_decoded_idx] decoded_pos = np.empty((likelihood.shape[0], pos_centers.shape[1])) * np.nan decoded_pos[prob_rows] = prob_decoded decoded_pos = np.squeeze(decoded_pos) return nept.Position(decoded_pos, time_centers)
def maxabs(trace, starttime=None, endtime=None): """Returns the maximum of the absolute values of `trace`, and its occurrence time. In other words, returns the point `(time, value)` where `value = max(abs(trace.data))` and time (`UTCDateTime`) is the time occurrence of `value` :param trace: the input obspy.core.Trace :param starttime: (`obspy.UTCDateTime`) the start time (None or missing defaults to the trace end): the maximum of the trace `abs` will be searched *from* this time. This argument, if provided, does not affect the returned `time` which will be always relative to the trace passed as argument :param endtime: an obspy UTCDateTime object (or any value `UTCDateTime` accepts, e.g. integer / `datetime` object) denoting the end time (None or missing defaults to the trace end): the maximum of the trace `abs` will be searched *until* this time :return: the tuple (time, value) where `value = max(abs(trace.data))`, and time is the value occurrence (`UTCDateTime`) :return: the tuple `(time_of_max_abs, max_abs)` """ original_stime = None if starttime is None else trace.stats.starttime if starttime is not None or endtime is not None: # from the docs: "this returns a New Trace object # Does not copy data but just passes a reference to it" trace = trace.slice(starttime, endtime) if trace.stats.npts < 1: return np.nan idx = np.nanargmax(np.abs(trace.data)) val = trace.data[idx] tdelta = 0 if original_stime is None else trace.stats.starttime - original_stime time = timeof(trace, idx) + tdelta return (time, val)
def optimize_threshold_with_roc(roc, thresholds, criterion='dist'): if roc.shape[1] > roc.shape[0]: roc = roc.T assert(roc.shape[0] == thresholds.shape[0]) if criterion == 'margin': scores = roc[:,1]-roc[:,0] else: scores = -cdist(np.array([[0,1]]), roc) ti = np.nanargmax(scores) return thresholds[ti], ti
def optimize_threshold_with_prc(prc, thresholds, criterion='min'): prc[np.isnan(prc)] = 0 if prc.shape[1] > prc.shape[0]: prc = prc.T assert(prc.shape[0] == thresholds.shape[0]) if criterion == 'sum': scores = prc.sum(axis=1) elif criterion.startswith('dist'): scores = -cdist(np.array([[1,1]]), prc) else: scores = prc.min(axis=1) ti = np.nanargmax(scores) return thresholds[ti], ti
def optimize_threshold_with_f1(f1c, thresholds, criterion='max'): #f1c[np.isnan(f1c)] = 0 if criterion == 'max': ti = np.nanargmax(f1c) else: ti = np.nanargmin(np.abs(thresholds-0.5*f1c)) #assert(np.all(thresholds>=0)) #idx = (thresholds>=f1c*0.5-mp) & (thresholds<=f1c*0.5+mp) #assert(np.any(idx)) #ti = np.where(idx)[0][f1c[idx].argmax()] return thresholds[ti], ti
def compute_draw_info(self, x, ys): bs = self.compute_baseline(x, ys) im = np.nanargmax(ys-bs, axis=1) lines = (x[im], bs[np.arange(bs.shape[0]), im]), (x[im], ys[np.arange(ys.shape[0]), im]) return [("curve", (x, self.compute_baseline(x, ys), INTEGRATE_DRAW_BASELINE_PENARGS)), ("curve", (x, ys, INTEGRATE_DRAW_BASELINE_PENARGS)), ("line", lines)]
def compute_integral(self, x_s, y_s): y_s = y_s - self.compute_baseline(x_s, y_s) if len(x_s) == 0: return np.zeros((y_s.shape[0],)) * np.nan # avoid whole nan rows whole_nan_rows = np.isnan(y_s).all(axis=1) y_s[whole_nan_rows] = 0 # select positions pos = x_s[np.nanargmax(y_s, axis=1)] # set unknown results pos[whole_nan_rows] = np.nan return pos
def get_best_threshold(y_ref, y_pred_score, plot=False): """ Get threshold on scores that maximizes f1 score. Parameters ---------- y_ref : array Reference labels (binary). y_pred_score : array Predicted scores. plot : bool If true, plot ROC curve Returns ------- best_threshold : float threshold on score that maximized f1 score max_fscore : float f1 score achieved at best_threshold """ pos_weight = 1.0 - float(len(y_ref[y_ref == 1]))/float(len(y_ref)) neg_weight = 1.0 - float(len(y_ref[y_ref == 0]))/float(len(y_ref)) sample_weight = np.zeros(y_ref.shape) sample_weight[y_ref == 1] = pos_weight sample_weight[y_ref == 0] = neg_weight print "max prediction value = %s" % np.max(y_pred_score) print "min prediction value = %s" % np.min(y_pred_score) precision, recall, thresholds = \ metrics.precision_recall_curve(y_ref, y_pred_score, pos_label=1, sample_weight=sample_weight) beta = 1.0 btasq = beta**2.0 fbeta_scores = (1.0 + btasq)*(precision*recall)/((btasq*precision)+recall) max_fscore = fbeta_scores[np.nanargmax(fbeta_scores)] best_threshold = thresholds[np.nanargmax(fbeta_scores)] if plot: plt.figure(1) plt.subplot(1, 2, 1) plt.plot(recall, precision, '.b', label='PR curve') plt.xlim([0.0, 1.0]) plt.ylim([0.0, 1.0]) plt.xlabel('Recall') plt.ylabel('Precision') plt.title('Precision-Recall Curve') plt.legend(loc="lower right", frameon=True) plt.subplot(1, 2, 2) plt.plot(thresholds, fbeta_scores[:-1], '.r', label='f1-score') plt.xlabel('Probability Threshold') plt.ylabel('F1 score') plt.show() plot_data = (recall, precision, thresholds, fbeta_scores[:-1]) return best_threshold, max_fscore, plot_data
def nanargmin(a, axis=None): """ Return the indices of the minimum values in the specified axis ignoring NaNs. For all-NaN slices ``ValueError`` is raised. Warning: the results cannot be trusted if a slice contains only NaNs and Infs. Parameters ---------- a : array_like Input data. axis : int, optional Axis along which to operate. By default flattened input is used. Returns ------- index_array : ndarray An array of indices or a single index value. See Also -------- argmin, nanargmax Examples -------- >>> a = np.array([[np.nan, 4], [2, 3]]) >>> np.argmin(a) 0 >>> np.nanargmin(a) 2 >>> np.nanargmin(a, axis=0) array([1, 1]) >>> np.nanargmin(a, axis=1) array([1, 0]) """ a, mask = _replace_nan(a, np.inf) res = np.argmin(a, axis=axis) if mask is not None: mask = np.all(mask, axis=axis) if np.any(mask): raise ValueError("All-NaN slice encountered") return res
def nanargmax(a, axis=None): """ Return the indices of the maximum values in the specified axis ignoring NaNs. For all-NaN slices ``ValueError`` is raised. Warning: the results cannot be trusted if a slice contains only NaNs and -Infs. Parameters ---------- a : array_like Input data. axis : int, optional Axis along which to operate. By default flattened input is used. Returns ------- index_array : ndarray An array of indices or a single index value. See Also -------- argmax, nanargmin Examples -------- >>> a = np.array([[np.nan, 4], [2, 3]]) >>> np.argmax(a) 0 >>> np.nanargmax(a) 1 >>> np.nanargmax(a, axis=0) array([1, 0]) >>> np.nanargmax(a, axis=1) array([1, 1]) """ a, mask = _replace_nan(a, -np.inf) res = np.argmax(a, axis=axis) if mask is not None: mask = np.all(mask, axis=axis) if np.any(mask): raise ValueError("All-NaN slice encountered") return res
def evaluate_hyperparameters(dataset, iterator, args): # Select features if args.features is not None and args.features != dataset.feature_names: print('selecting features ...') features = _explode_features(args.features) start = timeit.default_timer() dataset = dataset.dataset_from_feature_names(features) print('done, took %fs' % (timeit.default_timer() - start)) print('') states = range(3, 22 + 1) # = [3,...,22] topologies = ['full', 'left-to-right-full', 'left-to-right-1', 'left-to-right-2'] n_combinations = len(states) * len(topologies) curr_step = 0 combinations = [] measures = [] for state in states: for topology in topologies: curr_step += 1 prefix = '%.3d_%d_%s' % (curr_step, state, topology) print('(%.3d/%.3d) evaluating state=%d and topology=%s ...' % (curr_step, n_combinations, state, topology)) start = timeit.default_timer() try: # Configure args from which the HMMs are created args.n_states = state args.topology = topology ll_stats = _compute_averaged_pos_and_neg_lls(dataset, iterator, prefix, args) measure = _compute_measure(ll_stats, dataset, args) except: measure = np.nan if measure is np.isnan(measure): print('measure: not computable') else: print('measure: %f' % measure) combinations.append((str(state), topology)) measures.append(measure) print('done, took %fs' % (timeit.default_timer() - start)) print('') best_idx = np.nanargmax(np.array(measures)) # get the argmax ignoring NaNs print('best combination with score %f: %s' % (measures[best_idx], ', '.join(combinations[best_idx]))) print('detailed reports have been saved') # Save results assert len(combinations) == len(measures) if args.output_dir is not None: filename = '_results.csv' with open(os.path.join(args.output_dir, filename), 'wb') as f: writer = csv.writer(f, delimiter=';') writer.writerow(['', 'idx', 'measure', 'combination']) for idx, (measure, combination) in enumerate(zip(measures, combinations)): selected = '*' if best_idx == idx else '' writer.writerow([selected, '%d' % idx, '%f' % measure, ', '.join(combination)])
def evaluate_pca(dataset, iterator, args): # Select features if args.features is not None and args.features != dataset.feature_names: print('selecting features ...') features = _explode_features(args.features) start = timeit.default_timer() dataset = dataset.dataset_from_feature_names(features) print('done, took %fs' % (timeit.default_timer() - start)) print('') pca_components = range(1, dataset.n_features) total_steps = len(pca_components) if 'pca' not in args.transformers: args.transformers.append('pca') curr_step = 0 measures = [] for n_components in pca_components: curr_step += 1 prefix = '%.3d' % curr_step print('(%.3d/%.3d) evaluating with %d pca components ...' % (curr_step, total_steps, n_components)) start = timeit.default_timer() try: args.pca_components = n_components ll_stats = _compute_averaged_pos_and_neg_lls(dataset, iterator, prefix, args) measure = _compute_measure(ll_stats, dataset, args) except: measure = np.nan if measure is np.isnan(measure): print('measure: not computable') else: print('measure: %f' % measure) # Correct score. The problem is that it is computed given the dataset, which has too many features. measure = (measure * float(dataset.n_features)) / float(n_components) measures.append(measure) print('done, took %fs' % (timeit.default_timer() - start)) print('') assert len(pca_components) == len(measures) best_idx = np.nanargmax(np.array(measures)) # get the argmax ignoring NaNs print('best result with score %f: %d PCA components' % (measures[best_idx], pca_components[best_idx])) print('detailed reports have been saved') # Save results if args.output_dir is not None: filename = '_results.csv' with open(os.path.join(args.output_dir, filename), 'wb') as f: writer = csv.writer(f, delimiter=';') writer.writerow(['', 'idx', 'measure', 'components']) for idx, (measure, n_components) in enumerate(zip(measures, pca_components)): selected = '*' if best_idx == idx else '' writer.writerow([selected, '%d' % idx, '%f' % measure, '%d' % n_components])
def evaluate_fhmms(dataset, iterator, args): # Select features if args.features is not None and args.features != dataset.feature_names: print('selecting features ...') features = _explode_features(args.features) start = timeit.default_timer() dataset = dataset.dataset_from_feature_names(features) print('done, took %fs' % (timeit.default_timer() - start)) print('') chains = [1, 2, 3, 4] total_steps = len(chains) curr_step = 0 measures = [] for chain in chains: curr_step += 1 prefix = '%.3d_%d-chains' % (curr_step, chain) print('(%.3d/%.3d) evaluating n_chains=%d ...' % (curr_step, total_steps, chain)) start = timeit.default_timer() old_loglikelihood_method = args.loglikelihood_method try: # Configure args from which the HMMs are created args.n_chains = chain if chain == 1: args.model = 'hmm' args.loglikelihood_method = 'exact' # there's no approx loglikelihood method for HMMs else: args.model = 'fhmm-seq' ll_stats = _compute_averaged_pos_and_neg_lls(dataset, iterator, prefix, args, save_model=True, compute_distances=False) measure = _compute_measure(ll_stats, dataset, args) except: measure = np.nan args.loglikelihood_method = old_loglikelihood_method if measure is np.isnan(measure): print('measure: not computable') else: print('measure: %f' % measure) measures.append(measure) print('done, took %fs' % (timeit.default_timer() - start)) print('') best_idx = np.nanargmax(np.array(measures)) # get the argmax ignoring NaNs print('best model with score %f: %d chains' % (measures[best_idx], chains[best_idx])) print('detailed reports have been saved') # Save results assert len(chains) == len(measures) if args.output_dir is not None: filename = '_results.csv' with open(os.path.join(args.output_dir, filename), 'wb') as f: writer = csv.writer(f, delimiter=';') writer.writerow(['', 'idx', 'measure', 'chains']) for idx, (measure, chain) in enumerate(zip(measures, chains)): selected = '*' if best_idx == idx else '' writer.writerow([selected, '%d' % idx, '%f' % measure, '%d' % chain])
def phase_shift( shotGather, num_vel=2048, min_frequency=5, max_frequency=100, min_velocity=1, max_velocity=1000 ): # Ensure that min_velocity is greater than zero for numerical stability if min_velocity < 1: min_velocity = 1 # Frequency vector freq = np.arange(0, shotGather.fnyq, shotGather.df) # FFT of timeHistories (Equation 1 of Park et al. 1998)..................... U = np.fft.fft(shotGather.timeHistories, axis=0) # Remove frequencies above/below specificied max/min frequencies and downsample (if required by zero padding) fminID = np.argmin( np.absolute(freq-min_frequency) ) fmaxID = np.argmin( np.absolute(freq-max_frequency) ) freq_id = range(fminID,(fmaxID+1), shotGather.multiple) freq = freq[freq_id] U = U[freq_id, :] # Trial velocities v_vals = np.linspace( min_velocity, max_velocity, num_vel ) # Initialize variables v_peak = np.zeros( np.shape(freq) ) V = np.zeros( (np.shape(v_vals)[0], len(freq)) ) pnorm = np.zeros( np.shape(V) ) # Transformation ........................................................... # Loop through frequencies for c in range( len(freq) ): # Loop through trial velocities at each frequency for r in range( np.shape(v_vals)[0] ): # Set power equal to NaN at wavenumbers > kres if v_vals[r] < (2*np.pi*freq[c]/shotGather.kres): V[r,c] = float( 'nan' ) # (Equation 4 in Park et al. 1998) else: V[r,c] = np.abs( np.sum( U[c,:]/np.abs(U[c,:]) * np.exp( 1j*2*np.pi*freq[c]*shotGather.position / v_vals[r] ) ) ) # Identify index associated with peak power at current frequency max_id = np.nanargmax( V[:,c] ) pnorm[:,c] = V[:,c] / V[max_id,c] v_peak[c] = v_vals[max_id] # Create instance of DispersionPower class.................................. dispersionPower = dctypes.DispersionPower( freq, v_peak, v_vals, 'velocity', shotGather.kres, pnorm ) return dispersionPower #******************************************************************************* # Slant-stack transform
def compose_ko(radargrids, qualitygrids): """Composes grids according to quality information using quality \ information as a knockout criterion. The value of the composed pixel is taken from the radargrid whose quality grid has the highest value. Parameters ---------- radargrids : list of arrays radar data to be composited. Each item in the list corresponds to the data of one radar location. All items must have the same shape. qualitygrids : list of arrays quality data to decide upon which radar site will contribute its pixel to the composite. Then length of this list must be the same as that of `radargrids`. All items must have the same shape and be aligned with the items in `radargrids`. Returns ------- composite : array """ # first add a fallback array for all pixels having missing values in all # radargrids radarfallback = (np.repeat(np.nan, np.prod(radargrids[0].shape)) .reshape(radargrids[0].shape)) radargrids.append(radarfallback) radarinfo = np.array(radargrids) # then do the same for the quality grids qualityfallback = (np.repeat(-np.inf, np.prod(radargrids[0].shape)) .reshape(radargrids[0].shape)) qualitygrids.append(qualityfallback) qualityinfo = np.array(qualitygrids) select = np.nanargmax(qualityinfo, axis=0) composite = (radarinfo.reshape((radarinfo.shape[0], -1)) [select.ravel(), np.arange(np.prod(radarinfo.shape[1:]))] .reshape(radarinfo.shape[1:])) radargrids.pop() qualitygrids.pop() return composite
def cum_beam_block_frac(pbb): """Cumulative beam blockage fraction along a beam. Computes the cumulative beam blockage (cbb) along a beam from the partial beam blockage (pbb) fraction of each bin along that beam. CBB in one bin along a beam will always be at least as high as the maximum PBB of the preceeding bins. .. versionadded:: 0.10.0 Parameters ---------- pbb : :class:`numpy:numpy.ndarray` 2-D array of floats of shape (num beams, num range bins) Partial beam blockage fraction of a bin along a beam [m] Returns ------- cbb : :class:`numpy:numpy.ndarray` Array of floats of the same shape as pbb Cumulative partial beam blockage fraction [unitless] Examples -------- >>> PBB = beam_block_frac(Th,Bh,a) #doctest: +SKIP >>> CBB = cum_beam_block_frac(PBB) #doctest: +SKIP See :ref:`notebooks/beamblockage/wradlib_beamblock.ipynb`. """ # This is the index of the maximum PBB along each beam maxindex = np.nanargmax(pbb, axis=1) cbb = np.copy(pbb) # Iterate over all beams for ii, index in enumerate(maxindex): premax = 0. for jj in range(index): # Only iterate to max index to make this faster if pbb[ii, jj] > premax: cbb[ii, jj] = pbb[ii, jj] premax = pbb[ii, jj] else: cbb[ii, jj] = premax # beyond max index, everything is max anyway cbb[ii, index:] = pbb[ii, index] return cbb
def _auto_low_rank_model(data, mode, n_jobs, method_params, cv, stop_early=True, verbose=None): """compute latent variable models.""" method_params = cp.deepcopy(method_params) iter_n_components = method_params.pop('iter_n_components') if iter_n_components is None: iter_n_components = np.arange(5, data.shape[1], 5) from sklearn.decomposition import PCA, FactorAnalysis if mode == 'factor_analysis': est = FactorAnalysis elif mode == 'pca': est = PCA else: raise ValueError('Come on, this is not a low rank estimator: %s' % mode) est = est(**method_params) est.n_components = 1 scores = np.empty_like(iter_n_components, dtype=np.float64) scores.fill(np.nan) # make sure we don't empty the thing if it's a generator max_n = max(list(cp.deepcopy(iter_n_components))) if max_n > data.shape[1]: warn('You are trying to estimate %i components on matrix ' 'with %i features.' % (max_n, data.shape[1])) for ii, n in enumerate(iter_n_components): est.n_components = n try: # this may fail depending on rank and split score = _cross_val(data=data, est=est, cv=cv, n_jobs=n_jobs) except ValueError: score = np.inf if np.isinf(score) or score > 0: logger.info('... infinite values encountered. stopping estimation') break logger.info('... rank: %i - loglik: %0.3f' % (n, score)) if score != -np.inf: scores[ii] = score if (ii >= 3 and np.all(np.diff(scores[ii - 3:ii]) < 0.) and stop_early is True): # early stop search when loglik has been going down 3 times logger.info('early stopping parameter search.') break # happens if rank is too low right form the beginning if np.isnan(scores).all(): raise RuntimeError('Oh no! Could not estimate covariance because all ' 'scores were NaN. Please contact the MNE-Python ' 'developers.') i_score = np.nanargmax(scores) best = est.n_components = iter_n_components[i_score] logger.info('... best model at rank = %i' % best) runtime_info = {'ranks': np.array(iter_n_components), 'scores': scores, 'best': best, 'cv': cv} return est, runtime_info
def get_multievent_sg(cum_trace, tmin, tmax, tstart, threshold_inside_tmin_tmax_percent, threshold_inside_tmin_tmax_sec, threshold_after_tmax_percent): """ Returns the tuple (or a list of tuples, if the first argument is a stream) of the values (score, UTCDateTime of arrival) where scores is: 0: no double event, 1: double event inside tmin_tmax, 2: double event after tmax, 3: both double event previously defined are detected If score is 2 or 3, the second argument is the UTCDateTime denoting the occurrence of the first sample triggering the double event after tmax :param trace: the input obspy.core.Trace """ tmin = utcdatetime(tmin) tmax = utcdatetime(tmax) tstart = utcdatetime(tstart) # split traces between tmin and tmax and after tmax traces = [cum_trace.slice(tmin, tmax), cum_trace.slice(tmax, None)] # calculate second derivative and normalize: second_derivs = [] max_ = np.nan for ttt in traces: ttt.taper(type='cosine', max_percentage=0.05) sec_der = savitzky_golay(ttt.data, 31, 2, deriv=2) sec_der_abs = np.abs(sec_der) idx = np.nanargmax(sec_der_abs) # get max (global) for normalization: max_ = np.nanmax([max_, sec_der_abs[idx]]) second_derivs.append(sec_der_abs) # normalize second derivatives: for der in second_derivs: der /= max_ result = 0 # case A: see if after tmax we exceed a threshold indices = np.where(second_derivs[1] >= threshold_after_tmax_percent)[0] if len(indices): result = 2 # case B: see if inside tmin tmax we exceed a threshold, and in case check the duration deltatime = 0 indices = np.where(second_derivs[0] >= threshold_inside_tmin_tmax_percent)[0] starttime = endtime = None if len(indices) >= 2: idx0 = indices[0] idx1 = indices[-1] starttime = timeof(traces[0], idx0) endtime = timeof(traces[0], idx1) deltatime = endtime - starttime if deltatime >= threshold_inside_tmin_tmax_sec: result += 1 return result, deltatime, starttime, endtime