diff --git a/src/pylife/materiallaws/notch_approximation_law.py b/src/pylife/materiallaws/notch_approximation_law.py index d067cd23..7a0286ac 100644 --- a/src/pylife/materiallaws/notch_approximation_law.py +++ b/src/pylife/materiallaws/notch_approximation_law.py @@ -569,7 +569,7 @@ class NotchApproxBinner: def __init__(self, notch_approximation_law, number_of_bins=100): self._n_bins = number_of_bins self._notch_approximation_law = notch_approximation_law - self.ramberg_osgood_relation = notch_approximation_law.ramberg_osgood_relation + self._ramberg_osgood_relation = notch_approximation_law.ramberg_osgood_relation self._max_load_rep = None def initialize(self, max_load): @@ -579,14 +579,14 @@ def initialize(self, max_load): ---------- max_load : array_like The state of the maximum nominal load that is expected. The first - element is chosen as representative to caclulate the lookup table. + element is chosen as representative to calculate the lookup table. Returns ------- self """ max_load = np.asarray(max_load) - self._max_load_rep, _ = self._rep_abs_and_sign(max_load) + self._max_load_rep, _ = self._representative_value_and_sign(max_load) load = self._param_for_lut(self._n_bins, max_load) self._lut_primary = self._notch_approximation_law.primary(load) @@ -596,6 +596,13 @@ def initialize(self, max_load): return self + @property + def ramberg_osgood_relation(self): + """the Ramberg-Osgood relation object, i.e., an object of type RambergOsgood + provided by the notch approximation law. + """ + return self._ramberg_osgood_relation + def primary(self, load): """Lookup the stress strain of the primary branch. @@ -619,7 +626,7 @@ def primary(self, load): """ self._raise_if_uninitialized() - load_rep, sign = self._rep_abs_and_sign(load) + load_rep, sign = self._representative_value_and_sign(load) if load_rep > self._max_load_rep: msg = f"Requested load `{load_rep}`, higher than initialized maximum load `{self._max_load_rep}`" @@ -651,7 +658,7 @@ def secondary(self, delta_load): """ self._raise_if_uninitialized() - delta_load_rep, sign = self._rep_abs_and_sign(delta_load) + delta_load_rep, sign = self._representative_value_and_sign(delta_load) if delta_load_rep > 2.0 * self._max_load_rep: msg = f"Requested load `{delta_load_rep}`, higher than initialized maximum delta load `{2.0*self._max_load_rep}`" @@ -669,7 +676,7 @@ def _param_for_lut(self, number_of_bins, max_val): max_val, scale_m = np.meshgrid(max_val, scale) return (max_val * scale_m) - def _rep_abs_and_sign(self, value): + def _representative_value_and_sign(self, value): value = np.asarray(value) value_rep = value if len(value.shape) == 0 else value[0] return np.abs(value_rep), np.sign(value_rep) diff --git a/src/pylife/stress/rainflow/fkm_nonlinear.py b/src/pylife/stress/rainflow/fkm_nonlinear.py index d14a35f3..4f000fb8 100644 --- a/src/pylife/stress/rainflow/fkm_nonlinear.py +++ b/src/pylife/stress/rainflow/fkm_nonlinear.py @@ -36,6 +36,7 @@ STRAIN = 2 EPS_MIN_LF = 3 EPS_MAX_LF = 4 +STRESS_AND_STRAIN = slice(STRESS, STRAIN+1) PRIMARY = 0 SECONDARY = 1 @@ -43,7 +44,7 @@ MEMORY_1_2 = 1 MEMORY_3 = 0 -HYSTORY_COLUMNS = ["load", "stress", "strain", "secondary_branch"] +HISTORY_COLUMNS = ["load", "stress", "strain", "secondary_branch"] HISTORY_INDEX_LEVELS = [ "load_segment", "load_step", "run_index", "turning_point", "hyst_from", "hyst_to", "hyst_close" ] @@ -106,7 +107,7 @@ def __init__(self, recorder, notch_approximation_law, binner=NAL.NotchApproxBinn self._load_max_seen = 0.0 # maximum seen load value self._run_index = 0 # which run through the load sequence is currently performed - self._last_record = None + self._last_deformation_record = None self._residuals_record = _ResidualsRecord() self._residuals = np.array([]) self._record_vals_residuals = pd.DataFrame() @@ -212,17 +213,17 @@ def process(self, samples, flush=False): load_turning_points.groupby(turning_point_idx, sort=False).first() ) - record, hysts = self._perform_hcm_algorithm(load_turning_points_rep) + deform_type_record, hysts = self._perform_hcm_algorithm(load_turning_points_rep) - if self._last_record is None: - self._last_record = np.zeros((5, self._group_size)) + if self._last_deformation_record is None: + self._last_deformation_record = np.zeros((5, self._group_size)) num_turning_points = len(load_turning_points_rep) - record_vals = self._collect_record(load_turning_points, num_turning_points, record) + record_vals = self._process_deformation(load_turning_points, num_turning_points, deform_type_record) - self._store_recordings_for_history(record, record_vals, turning_point_idx, hysts) + self._store_recordings_for_history(deform_type_record, record_vals, turning_point_idx, hysts) - results = self._process_recording(load_turning_points_rep, record_vals, hysts) + results = self._process_hysteresis(record_vals, hysts) results_min, results_max = results self._update_residuals(record_vals, turning_point_idx, load_turning_points_rep) @@ -324,7 +325,33 @@ def _update_residuals(self, record_vals, turning_point, load_turning_points_rep) self._residuals_record.reindex() - def _collect_record(self, load_turning_points, num_turning_points, record): + def _process_deformation(self, load_turning_points, num_turning_points, deform_type_record): + """Calculate the local stress and strain of all turning points + + In ._perform_hcm_algorithm we recorded which turning point is in + PRIMARY deformation regime and which in SECONDARY + + Now we use this information to calculate the local stress and strain + according to the notch approximation law for each turining point for + each point in the mesh. + + Parameters + ---------- + load_turning_points : pd.Series (N * self._group_size) float + The load distribution of all the turning points. It carries all th + index levels of the initial load signal. + + num_turning_points : int + The number of tunring_points + + deform_type_record : np.ndarray(N, 2) int + The inndex and deformation type of each turning point + (see _perform_hcm_algorithm) + + Returns pd.DataFrame + columns: ["load", "stress", "strain", "epsilon_min_LF", "epsilon_max_LF"] + index: the same like load_turning_points + """ def primary(_prev, load): return self._notch_approximation_law.primary(load) @@ -332,34 +359,44 @@ def secondary(prev, load): prev_load = prev[LOAD] delta_L = load - prev_load - delta = self._notch_approximation_law.secondary(delta_L) + delta_stress_strain = self._notch_approximation_law.secondary(delta_L) + + return prev[STRESS_AND_STRAIN].T + delta_stress_strain + + def prev_record_from_residuals(prev_idx): + idx = len(self._record_vals_residuals) + prev_idx*self._group_size + return self._record_vals_residuals.iloc[idx:idx+self._group_size].to_numpy().T - return prev[STRESS:STRAIN+1].T + delta + def prev_record_from_this_run(prev_idx): + idx = prev_idx * self._group_size + return record_vals[:, idx:idx+self._group_size] def determine_prev_record(prev_idx): if prev_idx < 0: - idx = len(self._record_vals_residuals) + prev_idx*self._group_size - return self._record_vals_residuals.iloc[idx:idx+self._group_size].to_numpy().T - if prev_idx < i: - idx = prev_idx * self._group_size - return record_vals[:, idx:idx+self._group_size] - return self._last_record + return prev_record_from_residuals(prev_idx) + if prev_idx == curr_idx: + return self._last_deformation_record + return prev_record_from_this_run(prev_idx) record_vals = np.empty((5, num_turning_points*self._group_size)) turning_points = load_turning_points.to_numpy() - for i in range(num_turning_points): - prev_record = determine_prev_record(record[i, INDEX]) + for curr_idx in range(num_turning_points): + prev_record = determine_prev_record(deform_type_record[curr_idx, INDEX]) - idx = i * self._group_size - load_turning_point = turning_points[idx:idx+self._group_size] + idx = curr_idx * self._group_size + load = turning_points[idx:idx+self._group_size] + + deformation_function = secondary if deform_type_record[curr_idx, SECONDARY] else primary - deformation_function = secondary if record[i, SECONDARY] else primary result_buf = record_vals[:, idx:idx+self._group_size] - self._process_deformation( - deformation_function, result_buf, load_turning_point, prev_record - ) + + result_buf[LOAD] = load + result_buf[STRESS_AND_STRAIN] = deformation_function(prev_record, load).T + + self._calculate_epsilon_LF(result_buf) + self._last_deformation_record = result_buf record_vals = pd.DataFrame( record_vals.T, @@ -373,31 +410,55 @@ def determine_prev_record(prev_idx): record_vals["turning_point"] = np.stack(tp_index).T.flatten() return record_vals.set_index("turning_point", drop=True, append=True) - def _process_deformation(self, deformation_func, result_buf, load, prev_record): - result_buf[0] = load - res = deformation_func(prev_record, load).T - result_buf[1:3] = res + def _calculate_epsilon_LF(self, deformation_record): + """Calculate epsilon_LF values for the current deformation record - old_load = self._last_record[LOAD, 0] + Parameters + ---------- + deformation_record : np.ndarray (5) + [:3] load, stress, strain + [3:] reserved for epsilon_min_LF and epsilon_max_LF + """ + + old_load = self._last_deformation_record[LOAD, 0] + current_load = deformation_record[LOAD, 0] - if old_load < load[0]: - result_buf[EPS_MAX_LF] = ( - self._last_record[EPS_MAX_LF] - if self._last_record[EPS_MAX_LF, 0] > result_buf[STRAIN, 0] - else result_buf[STRAIN, :] + if old_load < current_load: + deformation_record[EPS_MAX_LF] = ( + self._last_deformation_record[EPS_MAX_LF] + if self._last_deformation_record[EPS_MAX_LF, 0] > deformation_record[STRAIN, 0] + else deformation_record[STRAIN, :] ) - result_buf[EPS_MIN_LF] = self._last_record[EPS_MIN_LF] + deformation_record[EPS_MIN_LF] = self._last_deformation_record[EPS_MIN_LF] else: - result_buf[EPS_MIN_LF] = ( - self._last_record[EPS_MIN_LF] - if self._last_record[EPS_MIN_LF, 0] < result_buf[STRAIN, 0] - else result_buf[STRAIN, :] + deformation_record[EPS_MIN_LF] = ( + self._last_deformation_record[EPS_MIN_LF] + if self._last_deformation_record[EPS_MIN_LF, 0] < deformation_record[STRAIN, 0] + else deformation_record[STRAIN, :] ) - result_buf[EPS_MAX_LF] = self._last_record[EPS_MAX_LF] + deformation_record[EPS_MAX_LF] = self._last_deformation_record[EPS_MAX_LF] + - self._last_record = result_buf + def _process_hysteresis(self, record_vals, hysts): + """Calcuclate all the recorded hysteresis values - def _process_recording(self, turning_points, record_vals, hysts): + For each hysteresis we calculate the two records consisting of + load, stress, strain, epsilon_min_LF, epsilon_max_LF + + Parameters + ---------- + record_vals: pd.DataFrame + colimns: load, stress, strain, epsilon_min_LF, epsilon_max_LF + index of load_turning_points + + hysts: np.ndarray(N, 2) + the recorded hysteresis information + (see ._perform_hcm_algorithm) + + Returns + ------- + result_min, result_max: pd.DataFrame + """ def turn_memory_1_2(values, index): if values[0][0, 0] < values[1][0, 0]: return (values[0], values[1], index[0], index[1]) @@ -414,8 +475,6 @@ def turn_memory_3(values, index): memory_functions = [turn_memory_3, turn_memory_1_2] start = len(self._residuals) - if start: - turning_points = np.concatenate((self._residuals, turning_points)) record_vals_with_residuals = pd.concat([self._record_vals_residuals, record_vals]) @@ -516,64 +575,104 @@ def _adjust_samples_and_flush_for_hcm_first_run(self, samples): return samples, flush def _perform_hcm_algorithm(self, load_turning_points): - """Perform the entire HCM algorithm for all load samples""" + """Perform the entire HCM algorithm for all load samples - # iz: number of not yet closed branches - # ir: number of residual loads corresponding to hystereses that cannot be closed, - # because they contain parts of the primary branch + Parameters + ---------- + load_turning_points : np.ndarray + The representative tunring points of the load signal - # iterate over loads from the given list of samples + Returns + ------- + deform_type_record : np.ndarray (N,2) integer + first column: index in the turning point array + second column: indicate whether the deformation is in PRIMARY or SECONDARY regime + + hysts : np.ndarray (N,4) integer + first column: Type of hysteresis memort (MEMORY_1_2 or MEMORY_3) + second column: index in turning point array of the hysteresis origin + third column: index in turning point array of the hysteresis front + fourth column: index in turning point array of the hysteresis + closing point (-1 if hysteresis not closed) + """ hysts = np.zeros((len(load_turning_points), 4), dtype=np.int64) - hyst_index = 0 + hyst_ptr = 0 - record = -np.ones((len(load_turning_points), 2), dtype=np.int64) - rec_index = 0 + deform_type_record = -np.ones((len(load_turning_points), 2), dtype=np.int64) + deform_ptr = 0 for index, current_load in enumerate(load_turning_points): - hyst_index = self._hcm_process_sample(current_load, index, hysts, hyst_index, record, rec_index) + hyst_ptr = self._hcm_process_sample( + current_load, index, hysts, hyst_ptr, deform_type_record[deform_ptr, :] + ) if np.abs(current_load) > self._load_max_seen: self._load_max_seen = np.abs(current_load) self._iz += 1 - self._residuals_record.append(rec_index, current_load) + self._residuals_record.append(deform_ptr, current_load) + + deform_ptr += 1 + + hysts = hysts[:hyst_ptr, :] + return deform_type_record, hysts + + def _hcm_process_sample( + self, + current_load, + current_idx, + hysts, + hyst_ptr, + deform_type_record, + ): + """Process one sample in the HCM algorithm, i.e., one load value + + Parameters + ---------- + current_load: float + The current representative load - rec_index += 1 + current_index: int + The index of the current load in the turning point list - hysts = hysts[:hyst_index, :] - return record, hysts + hysts: np.ndarray (N,4) integer + The hysteresis record (see ._perform_hcm_algorithm) - def _hcm_process_sample(self, current_load, current_index, hysts, hyst_index, record, rec_index): - """ Process one sample in the HCM algorithm, i.e., one load value """ + hyst_ptr: int + The pointer to the next hysteresis record + + deform_type_record : np.ndarray (N,2) integer + The deformation type record (see ._perform_hcm_algorithm) + """ - record_index = current_index + record_idx = current_idx while True: if self._iz == self._ir: if np.abs(current_load) > self._load_max_seen: # case a) i, "Memory 3" - record[rec_index, :] = [record_index, PRIMARY] + deform_type_record[:] = [record_idx, PRIMARY] residuals_idx = self._residuals_record.current_index - hysts[hyst_index, :] = [MEMORY_3, residuals_idx, current_index, -1] - hyst_index += 1 + hysts[hyst_ptr, :] = [MEMORY_3, residuals_idx, current_idx, -1] + hyst_ptr += 1 self._ir += 1 else: - record[rec_index, :] = [record_index, SECONDARY] + deform_type_record[:] = [record_idx, SECONDARY] break if self._iz < self._ir: - record[rec_index, :] = [record_index, PRIMARY] + deform_type_record[:] = [record_idx, PRIMARY] break # here we have iz > ir: if self._residuals_record.will_remain_open_by(current_load): - record[rec_index, :] = [record_index, SECONDARY] + deform_type_record[:] = [record_idx, SECONDARY] break # no -> we have a new hysteresis @@ -582,7 +681,7 @@ def _hcm_process_sample(self, current_load, current_index, hysts, hyst_index, re prev_idx_0, prev_load_0 = self._residuals_record.pop() if len(self._residuals_record): - record_index = self._residuals_record.current_index + record_idx = self._residuals_record.current_index self._iz -= 2 @@ -594,8 +693,8 @@ def _hcm_process_sample(self, current_load, current_index, hysts, hyst_index, re # the primary branch is not yet reached, continue processing residual loads, potentially # closing even more hysteresis - hysts[hyst_index, :] = [MEMORY_1_2, prev_idx_0, prev_idx_1, current_index] - hyst_index += 1 + hysts[hyst_ptr, :] = [MEMORY_1_2, prev_idx_0, prev_idx_1, current_idx] + hyst_ptr += 1 continue @@ -609,11 +708,11 @@ def _hcm_process_sample(self, current_load, current_index, hysts, hyst_index, re # Proceed on primary path for the rest, which was not part of the closed hysteresis - record[rec_index, :] = [record_index, PRIMARY] - hysts[hyst_index, :] = [MEMORY_1_2, prev_idx_0, prev_idx_1, current_index] - hyst_index += 1 + deform_type_record[:] = [record_idx, PRIMARY] + hysts[hyst_ptr, :] = [MEMORY_1_2, prev_idx_0, prev_idx_1, current_idx] + hyst_ptr += 1 - return hyst_index + return hyst_ptr @property def strain_values(self): @@ -745,8 +844,8 @@ def history(self): ["turning_point", "load_step", "hyst_from", "hyst_to"], ] = -1 history.loc[turning_point_drop_idx, "secondary_branch"] = True - history.loc[negate, HYSTORY_COLUMNS] = -history.loc[ - negate, HYSTORY_COLUMNS + history.loc[negate, HISTORY_COLUMNS] = -history.loc[ + negate, HISTORY_COLUMNS ] history.loc[negate, "hyst_to"] = history.loc[negate + 1, "hyst_to"].to_numpy() history.loc[negate + 1, "hyst_to"] = -1