有关 jMetal
的使用可以参考官方手册(不完整,正在完善中):https://jmetal.github.io/jMetalPy/index.html#和 GitHub 网页说明:https://github.com/jMetal/jMetalPy。本文使用的 jMetalpy
版本为 1.5.5
# 一、一些使用技巧
(1)在开始编写 jMetal
时,可以参考 GitHub
下载路径下的 jMetal-main/examples
中的案例,找到需要解决问题对应的案例,在两个(或一个)案例的基础上进行更改;
(2)第一个案例:选择单目标 / 多目标,之后选择使用的算法,找到一个案例;第二个案例:判断需要解决问题的类型(如整数规划, TSP
路径规划, 0-1
规划等),在全部案例中搜索对应问题类型的案例,找到第二个案例;
(3)在找到对应案例之后,只需要将 problem
更改为我们的问题即可。
# 二、MOEA/D 库文件更改
当使用 MOEA/D
算法求解序列问题时,如果遇到报错 Exception: The number of parents is not two: 3
,则需要更改 jMetalpy
的库代码。找到 site-packages\jmetal\algorithm\multiobjective\moead.py
,将第一个 mating_population.append(population[self.current_subproblem])
注释掉。最终代码如下
import copy | |
import random | |
from math import ceil | |
from typing import TypeVar, List, Generator | |
import numpy as np | |
from jmetal.algorithm.singleobjective.genetic_algorithm import GeneticAlgorithm | |
from jmetal.config import store | |
from jmetal.core.operator import Mutation | |
from jmetal.core.problem import Problem | |
from jmetal.operator import DifferentialEvolutionCrossover, NaryRandomSolutionSelection | |
from jmetal.util.aggregative_function import AggregativeFunction | |
from jmetal.util.constraint_handling import feasibility_ratio, \ | |
overall_constraint_violation_degree, is_feasible | |
from jmetal.util.density_estimator import CrowdingDistance | |
from jmetal.util.evaluator import Evaluator | |
from jmetal.util.neighborhood import WeightVectorNeighborhood | |
from jmetal.util.ranking import FastNonDominatedRanking | |
from jmetal.util.termination_criterion import TerminationCriterion, StoppingByEvaluations | |
S = TypeVar('S') | |
R = List[S] | |
class MOEAD(GeneticAlgorithm): | |
def __init__(self, | |
problem: Problem, | |
population_size: int, | |
mutation: Mutation, | |
crossover: DifferentialEvolutionCrossover, | |
aggregative_function: AggregativeFunction, | |
neighbourhood_selection_probability: float, | |
max_number_of_replaced_solutions: int, | |
neighbor_size: int, | |
weight_files_path: str, | |
termination_criterion: TerminationCriterion = store.default_termination_criteria, | |
population_generator: Generator = store.default_generator, | |
population_evaluator: Evaluator = store.default_evaluator): | |
""" | |
:param max_number_of_replaced_solutions: (eta in Zhang & Li paper). | |
:param neighbourhood_selection_probability: Probability of mating with a solution in the neighborhood rather | |
than the entire population (Delta in Zhang & Li paper). | |
""" | |
super(MOEAD, self).__init__( | |
problem=problem, | |
population_size=population_size, | |
offspring_population_size=1, | |
mutation=mutation, | |
crossover=crossover, | |
selection=NaryRandomSolutionSelection(2), | |
population_evaluator=population_evaluator, | |
population_generator=population_generator, | |
termination_criterion=termination_criterion | |
) | |
self.max_number_of_replaced_solutions = max_number_of_replaced_solutions | |
self.fitness_function = aggregative_function | |
self.neighbourhood = WeightVectorNeighborhood( | |
number_of_weight_vectors=population_size, | |
neighborhood_size=neighbor_size, | |
weight_vector_size=problem.number_of_objectives, | |
weights_path=weight_files_path | |
) | |
self.neighbourhood_selection_probability = neighbourhood_selection_probability | |
self.permutation = None | |
self.current_subproblem = 0 | |
self.neighbor_type = None | |
def init_progress(self) -> None: | |
self.evaluations = self.population_size | |
for solution in self.solutions: | |
self.fitness_function.update(solution.objectives) | |
self.permutation = Permutation(self.population_size) | |
observable_data = self.get_observable_data() | |
self.observable.notify_all(**observable_data) | |
def selection(self, population: List[S]): | |
self.current_subproblem = self.permutation.get_next_value() | |
self.neighbor_type = self.choose_neighbor_type() | |
if self.neighbor_type == 'NEIGHBOR': | |
neighbors = self.neighbourhood.get_neighbors(self.current_subproblem, population) | |
mating_population = self.selection_operator.execute(neighbors) | |
else: | |
mating_population = self.selection_operator.execute(population) | |
# mating_population.append(population[self.current_subproblem]) | |
return mating_population | |
def reproduction(self, mating_population: List[S]) -> List[S]: | |
self.crossover_operator.current_individual = self.solutions[self.current_subproblem] | |
offspring_population = self.crossover_operator.execute(mating_population) | |
self.mutation_operator.execute(offspring_population[0]) | |
return offspring_population | |
def replacement(self, population: List[S], offspring_population: List[S]) -> List[S]: | |
new_solution = offspring_population[0] | |
self.fitness_function.update(new_solution.objectives) | |
new_population = self.update_current_subproblem_neighborhood(new_solution, population) | |
return new_population | |
def update_current_subproblem_neighborhood(self, new_solution, population): | |
permuted_neighbors_indexes = self.generate_permutation_of_neighbors(self.current_subproblem) | |
replacements = 0 | |
for i in range(len(permuted_neighbors_indexes)): | |
k = permuted_neighbors_indexes[i] | |
f1 = self.fitness_function.compute(population[k].objectives, self.neighbourhood.weight_vectors[k]) | |
f2 = self.fitness_function.compute(new_solution.objectives, self.neighbourhood.weight_vectors[k]) | |
if f2 < f1: | |
population[k] = copy.deepcopy(new_solution) | |
replacements += 1 | |
if replacements >= self.max_number_of_replaced_solutions: | |
break | |
return population | |
def generate_permutation_of_neighbors(self, subproblem_id): | |
if self.neighbor_type == 'NEIGHBOR': | |
neighbors = self.neighbourhood.get_neighborhood()[subproblem_id] | |
permuted_array = copy.deepcopy(neighbors.tolist()) | |
else: | |
permuted_array = Permutation(self.population_size).get_permutation() | |
return permuted_array | |
def choose_neighbor_type(self): | |
rnd = random.random() | |
if rnd < self.neighbourhood_selection_probability: | |
neighbor_type = 'NEIGHBOR' | |
else: | |
neighbor_type = 'POPULATION' | |
return neighbor_type | |
def get_name(self): | |
return 'MOEAD' | |
def get_result(self): | |
return self.solutions | |
class MOEAD_DRA(MOEAD): | |
def __init__(self, problem, population_size, mutation, crossover, aggregative_function, | |
neighbourhood_selection_probability, max_number_of_replaced_solutions, neighbor_size, | |
weight_files_path, termination_criterion=store.default_termination_criteria, | |
population_generator=store.default_generator, population_evaluator=store.default_evaluator): | |
super(MOEAD_DRA, self).__init__(problem, population_size, mutation, crossover, aggregative_function, | |
neighbourhood_selection_probability, max_number_of_replaced_solutions, | |
neighbor_size, weight_files_path, | |
termination_criterion=termination_criterion, | |
population_generator=population_generator, | |
population_evaluator=population_evaluator) | |
self.saved_values = [] | |
self.utility = [1.0 for _ in range(population_size)] | |
self.frequency = [0.0 for _ in range(population_size)] | |
self.generation_counter = 0 | |
self.order = [] | |
self.current_order_index = 0 | |
def init_progress(self): | |
super().init_progress() | |
self.saved_values = [copy.copy(solution) for solution in self.solutions] | |
self.evaluations = self.population_size | |
for solution in self.solutions: | |
self.fitness_function.update(solution.objectives) | |
self.order = self.__tour_selection(10) | |
self.current_order_index = 0 | |
observable_data = self.get_observable_data() | |
self.observable.notify_all(**observable_data) | |
def update_progress(self): | |
super().update_progress() | |
self.current_order_index += 1 | |
if self.current_order_index == (len(self.order)): | |
self.order = self.__tour_selection(10) | |
self.current_order_index = 0 | |
self.generation_counter += 1 | |
if self.generation_counter % 30 == 0: | |
self.__utility_function() | |
def selection(self, population: List[S]): | |
self.current_subproblem = self.order[self.current_order_index] | |
self.current_order_index += 1 | |
self.frequency[self.current_subproblem] += 1 | |
self.neighbor_type = self.choose_neighbor_type() | |
if self.neighbor_type == 'NEIGHBOR': | |
neighbors = self.neighbourhood.get_neighbors(self.current_subproblem, population) | |
mating_population = self.selection_operator.execute(neighbors) | |
else: | |
mating_population = self.selection_operator.execute(population) | |
mating_population.append(population[self.current_subproblem]) | |
return mating_population | |
def get_name(self): | |
return 'MOEAD-DRA' | |
def __utility_function(self): | |
for i in range(len(self.solutions)): | |
f1 = self.fitness_function.compute(self.solutions[i].objectives, self.neighbourhood.weight_vectors[i]) | |
f2 = self.fitness_function.compute(self.saved_values[i].objectives, self.neighbourhood.weight_vectors[i]) | |
delta = f2 - f1 | |
if delta > 0.001: | |
self.utility[i] = 1.0 | |
else: | |
utility_value = (0.95 + (0.05 * delta / 0.001)) * self.utility[i] | |
self.utility[i] = utility_value if utility_value < 1.0 else 1.0 | |
self.saved_values[i] = copy.copy(self.solutions[i]) | |
def __tour_selection(self, depth): | |
selected = [i for i in range(self.problem.number_of_objectives)] | |
candidate = [i for i in range(self.problem.number_of_objectives, self.population_size)] | |
while len(selected) < int(self.population_size / 5.0): | |
best_idd = int(random.random() * len(candidate)) | |
best_sub = candidate[best_idd] | |
for i in range(1, depth): | |
i2 = int(random.random() * len(candidate)) | |
s2 = candidate[i2] | |
if self.utility[s2] > self.utility[best_sub]: | |
best_idd = i2 | |
best_sub = s2 | |
selected.append(best_sub) | |
del candidate[best_idd] | |
return selected | |
class MOEADIEpsilon(MOEAD): | |
def __init__(self, | |
problem: Problem, | |
population_size: int, | |
mutation: Mutation, | |
crossover: DifferentialEvolutionCrossover, | |
aggregative_function: AggregativeFunction, | |
neighbourhood_selection_probability: float, | |
max_number_of_replaced_solutions: int, | |
neighbor_size: int, | |
weight_files_path: str, | |
termination_criterion: TerminationCriterion = StoppingByEvaluations(300000), | |
population_generator: Generator = store.default_generator, | |
population_evaluator: Evaluator = store.default_evaluator): | |
""" | |
:param max_number_of_replaced_solutions: (eta in Zhang & Li paper). | |
:param neighbourhood_selection_probability: Probability of mating with a solution in the neighborhood rather | |
than the entire population (Delta in Zhang & Li paper). | |
""" | |
super(MOEADIEpsilon, self).__init__( | |
problem=problem, | |
population_size=population_size, | |
mutation=mutation, | |
crossover=crossover, | |
aggregative_function=aggregative_function, | |
neighbourhood_selection_probability=neighbourhood_selection_probability, | |
max_number_of_replaced_solutions=max_number_of_replaced_solutions, | |
neighbor_size=neighbor_size, | |
weight_files_path=weight_files_path, | |
population_evaluator=population_evaluator, | |
population_generator=population_generator, | |
termination_criterion=termination_criterion | |
) | |
self.constraints = [] | |
self.epsilon_k = 0 | |
self.phi_max = -1e30 | |
self.epsilon_zero = 0 | |
self.tc = 800 | |
self.tao = 0.05 | |
self.rk = 0 | |
self.generation_counter = 0 | |
self.archive = [] | |
def init_progress(self) -> None: | |
super().init_progress() | |
# for i in range(self.population_size): | |
# self.constraints[i] = get_overall_constraint_violation_degree(self.permutation[i]) | |
self.constraints = [overall_constraint_violation_degree(self.solutions[i]) | |
for i in range(0, self.population_size)] | |
sorted(self.constraints) | |
self.epsilon_zero = abs(self.constraints[int(ceil(0.05 * self.population_size))]) | |
if self.phi_max < abs(self.constraints[0]): | |
self.phi_max = abs(self.constraints[0]) | |
self.rk = feasibility_ratio(self.solutions) | |
self.epsilon_k = self.epsilon_zero | |
def update_progress(self) -> None: | |
super().update_progress() | |
if self.evaluations % self.population_size == 0: | |
self.update_external_archive() | |
self.generation_counter += 1 | |
self.rk = feasibility_ratio(self.solutions) | |
if self.generation_counter >= self.tc: | |
self.epsilon_k = 0 | |
else: | |
if self.rk < 0.95: | |
self.epsilon_k = (1 - self.tao) * self.epsilon_k | |
else: | |
self.epsilon_k = self.phi_max * (1 + self.tao) | |
def update_current_subproblem_neighborhood(self, new_solution, population): | |
if self.phi_max < overall_constraint_violation_degree(new_solution): | |
self.phi_max = overall_constraint_violation_degree(new_solution) | |
permuted_neighbors_indexes = self.generate_permutation_of_neighbors(self.current_subproblem) | |
replacements = 0 | |
for i in range(len(permuted_neighbors_indexes)): | |
k = permuted_neighbors_indexes[i] | |
f1 = self.fitness_function.compute(population[k].objectives, self.neighbourhood.weight_vectors[k]) | |
f2 = self.fitness_function.compute(new_solution.objectives, self.neighbourhood.weight_vectors[k]) | |
cons1 = abs(overall_constraint_violation_degree(self.solutions[k])) | |
cons2 = abs(overall_constraint_violation_degree(new_solution)) | |
if cons1 < self.epsilon_k and cons2 <= self.epsilon_k: | |
if f2 < f1: | |
population[k] = copy.deepcopy(new_solution) | |
replacements += 1 | |
elif cons1 == cons2: | |
if f2 < f1: | |
population[k] = copy.deepcopy(new_solution) | |
replacements += 1 | |
elif cons2 < cons1: | |
population[k] = copy.deepcopy(new_solution) | |
replacements += 1 | |
if replacements >= self.max_number_of_replaced_solutions: | |
break | |
return population | |
def update_external_archive(self): | |
feasible_solutions = [] | |
for solution in self.solutions: | |
if is_feasible(solution): | |
feasible_solutions.append(copy.deepcopy(solution)) | |
if len(feasible_solutions) > 0: | |
feasible_solutions = feasible_solutions + self.archive | |
ranking = FastNonDominatedRanking() | |
ranking.compute_ranking(feasible_solutions) | |
first_rank_solutions = ranking.get_subfront(0) | |
if len(first_rank_solutions) <= self.population_size: | |
self.archive = [] | |
for solution in first_rank_solutions: | |
self.archive.append(copy.deepcopy(solution)) | |
else: | |
crowding_distance = CrowdingDistance() | |
while len(first_rank_solutions) > self.population_size: | |
crowding_distance.compute_density_estimator(first_rank_solutions) | |
first_rank_solutions = sorted(first_rank_solutions, key=lambda x: x.attributes['crowding_distance'], | |
reverse=True) | |
first_rank_solutions.pop() | |
self.archive = [] | |
for solution in first_rank_solutions: | |
self.archive.append(copy.deepcopy(solution)) | |
def get_result(self): | |
return self.archive | |
class Permutation: | |
def __init__(self, length: int): | |
self.counter = 0 | |
self.length = length | |
self.permutation = np.random.permutation(length) | |
def get_next_value(self): | |
next_value = self.permutation[self.counter] | |
self.counter += 1 | |
if self.counter == self.length: | |
self.permutation = np.random.permutation(self.length) | |
self.counter = 0 | |
return next_value | |
def get_permutation(self): | |
return self.permutation.tolist() |
# 三、成功使用案例
import random | |
from jmetal.lab.visualization import Plot | |
from jmetal.algorithm.multiobjective.nsgaii import NSGAII | |
from jmetal.core.problem import PermutationProblem | |
from jmetal.core.solution import PermutationSolution | |
from parameter import * | |
from functions import * | |
import numpy as np | |
from jmetal.operator import BinaryTournamentSelection | |
from jmetal.operator.crossover import PMXCrossover | |
from jmetal.operator.mutation import PermutationSwapMutation | |
from jmetal.util.comparator import MultiComparator | |
from jmetal.util.density_estimator import CrowdingDistance | |
from jmetal.util.ranking import FastNonDominatedRanking | |
from jmetal.util.termination_criterion import StoppingByEvaluations | |
from jmetal.util.solution import ( | |
get_non_dominated_solutions, | |
print_function_values_to_file, | |
print_variables_to_file, | |
) | |
""" | |
Program to configure and run the NSGA-II algorithm configured with standard settings. | |
""" | |
class RMFS(PermutationProblem): | |
"""Class representing TSP Problem.""" | |
def __init__(self, objectives_list): | |
super(RMFS, self).__init__() | |
self.number_of_objectives = len(objectives_list) | |
self.objectives_list = objectives_list | |
self.obj_directions = [self.MINIMIZE for _ in range(self.number_of_objectives)] | |
self.number_of_variables = n_pod | |
self.number_of_constraints = 0 | |
def evaluate(self, solution: PermutationSolution) -> PermutationSolution: | |
x = np.zeros((n_pod, n_robot, n_pod), dtype=int) # 货架 i 是机器人 r 第 k 个转运的任务(0,1 变量) | |
tP = -1 * np.ones(n_pod) # 货架 i 被机器人拿起的时间,-1 表示未搬运货架 | |
tD = -1 * np.ones(n_pod) # 货架 i 被机器人放下的时间,-1 表示未搬运货架 | |
robot_condition = np.zeros(n_robot) # 每个机器人的工作状态(即完成当前工作时间) | |
n_robot_work = np.zeros(n_robot, dtype=int) # 每个机器人的工作数量 | |
selected_pods = get_selected_pods(A, n_commodity_type, n_pod, B, solution.variables) | |
for i in range(len(selected_pods)): | |
selected_robot = select_robot(robot_condition) | |
tP[selected_pods[i]] = robot_condition[selected_robot] + Tp[selected_pods[i]] | |
tD[selected_pods[i]] = tP[selected_pods[i]] + Td[selected_pods[i]] | |
robot_condition[selected_robot] = robot_condition[selected_robot] + Tp[selected_pods[i]] + Td[selected_pods[i]] | |
x[selected_pods[i], selected_robot, n_robot_work[selected_robot]] = 1 | |
n_robot_work[selected_robot] = n_robot_work[selected_robot] + 1 | |
y = get_y(tD, selected_pods) | |
z = get_z(y, np.array(selected_pods), np.array(A), np.array(B)) | |
tH = get_tH(y, z, selected_pods, tD, Th) | |
tO = get_tO(y, z, np.array(selected_pods), B, Th, tH) | |
target = [round(max(tH + Th * np.sum(np.sum(z, axis=1), axis=1)), decimal_Th), | |
round(float(np.sum(tH + Th * np.sum(np.sum(z, axis=1), axis=1))), decimal_Th), np.sum(y), | |
round(g_target4(Tp, Td, x), decimal_Th), round(sum(tO), decimal_Th)] | |
# if not judge_constraint (x, y, z, tP, tD, tH, selected_pods): # 判断是否符合约束条件 | |
# raise Exception ("不满足约束条件") | |
for i, objective in enumerate(self.objectives_list): | |
solution.objectives[i] = target[objective - 1] | |
return solution | |
def create_solution(self) -> PermutationSolution: | |
new_solution = PermutationSolution( | |
number_of_variables=self.number_of_variables, number_of_objectives=self.number_of_objectives | |
) | |
new_solution.variables = random.sample(range(self.number_of_variables), k=self.number_of_variables) | |
return new_solution | |
@property | |
def number_of_cities(self): | |
return self.number_of_variables | |
def get_name(self): | |
return "RMFS" | |
if __name__ == "__main__": | |
objectives_list = [3, 5] | |
problem = RMFS(objectives_list) # 参数为目标值 1-5 | |
print("n_pod: ", problem.number_of_variables) | |
max_evaluations = 10000 | |
algorithm = NSGAII( | |
problem=problem, | |
population_size=100, | |
offspring_population_size=100, | |
mutation=PermutationSwapMutation(1.0 / problem.number_of_variables), | |
crossover=PMXCrossover(0.8), | |
selection=BinaryTournamentSelection( | |
MultiComparator([FastNonDominatedRanking.get_comparator(), CrowdingDistance.get_comparator()]) | |
), | |
termination_criterion=StoppingByEvaluations(max_evaluations=max_evaluations), | |
) | |
algorithm.run() | |
front = get_non_dominated_solutions(algorithm.get_result()) | |
# Save results to file | |
print_function_values_to_file(front, "FUN." + algorithm.label) | |
print_variables_to_file(front, "VAR." + algorithm.label) | |
if 2 <= len(objectives_list) <= 3: | |
plot_front = Plot(title='Pareto front approximation', axis_labels=['x', 'y']) | |
plot_front.plot(front, label='NSGAII-ZDT1') | |
print(f"Algorithm: {algorithm.get_name()}") | |
print(f"Problem: {problem.get_name()}") | |
if len(objectives_list) == 1: | |
print("Solution: {}".format(front[0].variables)) | |
print("Fitness: {}".format(front[0].objectives[0])) | |
print(f"Computing time: {algorithm.total_computing_time}") |
import random | |
from jmetal.lab.visualization import Plot | |
from jmetal.core.problem import PermutationProblem | |
from jmetal.core.solution import PermutationSolution | |
from parameter import * | |
from functions import * | |
import numpy as np | |
from jmetal.operator.crossover import PMXCrossover | |
from jmetal.operator.mutation import PermutationSwapMutation | |
from jmetal.util.termination_criterion import StoppingByEvaluations | |
from jmetal.util.solution import ( | |
get_non_dominated_solutions, | |
print_function_values_to_file, | |
print_variables_to_file, | |
) | |
from jmetal.algorithm.multiobjective.nsgaiii import ( | |
NSGAIII, | |
UniformReferenceDirectionFactory, | |
) | |
class RMFS(PermutationProblem): | |
"""Class representing TSP Problem.""" | |
def __init__(self, objectives_list): | |
super(RMFS, self).__init__() | |
self.number_of_objectives = len(objectives_list) | |
self.objectives_list = objectives_list | |
self.obj_directions = [self.MINIMIZE for _ in range(self.number_of_objectives)] | |
self.number_of_variables = n_pod | |
self.number_of_constraints = 0 | |
def evaluate(self, solution: PermutationSolution) -> PermutationSolution: | |
x = np.zeros((n_pod, n_robot, n_pod), dtype=int) # 货架 i 是机器人 r 第 k 个转运的任务(0,1 变量) | |
tP = -1 * np.ones(n_pod) # 货架 i 被机器人拿起的时间,-1 表示未搬运货架 | |
tD = -1 * np.ones(n_pod) # 货架 i 被机器人放下的时间,-1 表示未搬运货架 | |
robot_condition = np.zeros(n_robot) # 每个机器人的工作状态(即完成当前工作时间) | |
n_robot_work = np.zeros(n_robot, dtype=int) # 每个机器人的工作数量 | |
selected_pods = get_selected_pods(A, n_commodity_type, n_pod, B, solution.variables) | |
for i in range(len(selected_pods)): | |
selected_robot = select_robot(robot_condition) | |
tP[selected_pods[i]] = robot_condition[selected_robot] + Tp[selected_pods[i]] | |
tD[selected_pods[i]] = tP[selected_pods[i]] + Td[selected_pods[i]] | |
robot_condition[selected_robot] = robot_condition[selected_robot] + Tp[selected_pods[i]] + Td[selected_pods[i]] | |
x[selected_pods[i], selected_robot, n_robot_work[selected_robot]] = 1 | |
n_robot_work[selected_robot] = n_robot_work[selected_robot] + 1 | |
y = get_y(tD, selected_pods) | |
z = get_z(y, np.array(selected_pods), np.array(A), np.array(B)) | |
tH = get_tH(y, z, selected_pods, tD, Th) | |
tO = get_tO(y, z, np.array(selected_pods), B, Th, tH) | |
target = [round(max(tH + Th * np.sum(np.sum(z, axis=1), axis=1)), decimal_Th), | |
round(float(np.sum(tH + Th * np.sum(np.sum(z, axis=1), axis=1))), decimal_Th), np.sum(y), | |
round(g_target4(Tp, Td, x), decimal_Th), round(sum(tO), decimal_Th)] | |
# if not judge_constraint (x, y, z, tP, tD, tH, selected_pods): # 判断是否符合约束条件 | |
# raise Exception ("不满足约束条件") | |
for i, objective in enumerate(self.objectives_list): | |
solution.objectives[i] = target[objective - 1] | |
return solution | |
def create_solution(self) -> PermutationSolution: | |
new_solution = PermutationSolution( | |
number_of_variables=self.number_of_variables, number_of_objectives=self.number_of_objectives | |
) | |
new_solution.variables = random.sample(range(self.number_of_variables), k=self.number_of_variables) | |
return new_solution | |
@property | |
def number_of_cities(self): | |
return self.number_of_variables | |
def get_name(self): | |
return "RMFS" | |
if __name__ == "__main__": | |
objectives_list = [1, 2, 3, 4, 5] | |
problem = RMFS(objectives_list) # 参数为目标值 1-5 | |
print("n_pod: ", problem.number_of_variables) | |
max_evaluations = 20000 | |
algorithm = NSGAIII( | |
problem=problem, | |
population_size=92, | |
reference_directions=UniformReferenceDirectionFactory(len(objectives_list), n_points=91), | |
mutation=PermutationSwapMutation(1.0 / problem.number_of_variables), | |
crossover=PMXCrossover(0.8), | |
termination_criterion=StoppingByEvaluations(max_evaluations=max_evaluations), | |
) | |
algorithm.run() | |
front = get_non_dominated_solutions(algorithm.get_result()) | |
# Save results to file | |
print_function_values_to_file(front, "FUN." + algorithm.label) | |
print_variables_to_file(front, "VAR." + algorithm.label) | |
if 2 <= len(objectives_list) <= 3: | |
plot_front = Plot(title='Pareto front approximation', axis_labels=['x', 'y']) | |
plot_front.plot(front, label='NSGAII-ZDT1') | |
print(f"Algorithm: {algorithm.get_name()}") | |
print(f"Problem: {problem.get_name()}") | |
if len(objectives_list) == 1: | |
print("Solution: {}".format(front[0].variables)) | |
print("Fitness: {}".format(front[0].objectives[0])) | |
print(f"Computing time: {algorithm.total_computing_time}") |
import random | |
from jmetal.lab.visualization import Plot | |
from jmetal.core.problem import PermutationProblem | |
from jmetal.core.solution import PermutationSolution | |
from parameter import * | |
from functions import * | |
import numpy as np | |
from jmetal.operator.crossover import PMXCrossover | |
from jmetal.operator.mutation import PermutationSwapMutation | |
from jmetal.util.termination_criterion import StoppingByEvaluations | |
from jmetal.util.solution import ( | |
get_non_dominated_solutions, | |
print_function_values_to_file, | |
print_variables_to_file, | |
) | |
from jmetal.algorithm.multiobjective.moead import MOEAD | |
from jmetal.util.aggregative_function import Tschebycheff | |
class RMFS(PermutationProblem): | |
"""Class representing TSP Problem.""" | |
def __init__(self, objectives_list): | |
super(RMFS, self).__init__() | |
self.number_of_objectives = len(objectives_list) | |
self.objectives_list = objectives_list | |
self.obj_directions = [self.MINIMIZE for _ in range(self.number_of_objectives)] | |
self.number_of_variables = n_pod | |
self.number_of_constraints = 0 | |
def evaluate(self, solution: PermutationSolution) -> PermutationSolution: | |
x = np.zeros((n_pod, n_robot, n_pod), dtype=int) # 货架 i 是机器人 r 第 k 个转运的任务(0,1 变量) | |
tP = -1 * np.ones(n_pod) # 货架 i 被机器人拿起的时间,-1 表示未搬运货架 | |
tD = -1 * np.ones(n_pod) # 货架 i 被机器人放下的时间,-1 表示未搬运货架 | |
robot_condition = np.zeros(n_robot) # 每个机器人的工作状态(即完成当前工作时间) | |
n_robot_work = np.zeros(n_robot, dtype=int) # 每个机器人的工作数量 | |
selected_pods = get_selected_pods(A, n_commodity_type, n_pod, B, solution.variables) | |
for i in range(len(selected_pods)): | |
selected_robot = select_robot(robot_condition) | |
tP[selected_pods[i]] = robot_condition[selected_robot] + Tp[selected_pods[i]] | |
tD[selected_pods[i]] = tP[selected_pods[i]] + Td[selected_pods[i]] | |
robot_condition[selected_robot] = robot_condition[selected_robot] + Tp[selected_pods[i]] + Td[selected_pods[i]] | |
x[selected_pods[i], selected_robot, n_robot_work[selected_robot]] = 1 | |
n_robot_work[selected_robot] = n_robot_work[selected_robot] + 1 | |
y = get_y(tD, selected_pods) | |
z = get_z(y, np.array(selected_pods), np.array(A), np.array(B)) | |
tH = get_tH(y, z, selected_pods, tD, Th) | |
tO = get_tO(y, z, np.array(selected_pods), B, Th, tH) | |
target = [round(max(tH + Th * np.sum(np.sum(z, axis=1), axis=1)), decimal_Th), | |
round(float(np.sum(tH + Th * np.sum(np.sum(z, axis=1), axis=1))), decimal_Th), np.sum(y), | |
round(g_target4(Tp, Td, x), decimal_Th), round(sum(tO), decimal_Th)] | |
# if not judge_constraint (x, y, z, tP, tD, tH, selected_pods): # 判断是否符合约束条件 | |
# raise Exception ("不满足约束条件") | |
for i, objective in enumerate(self.objectives_list): | |
solution.objectives[i] = target[objective - 1] | |
return solution | |
def create_solution(self) -> PermutationSolution: | |
new_solution = PermutationSolution( | |
number_of_variables=self.number_of_variables, number_of_objectives=self.number_of_objectives | |
) | |
new_solution.variables = random.sample(range(self.number_of_variables), k=self.number_of_variables) | |
return new_solution | |
@property | |
def number_of_cities(self): | |
return self.number_of_variables | |
def get_name(self): | |
return "RMFS" | |
if __name__ == "__main__": | |
objectives_list = [1, 2, 3, 4, 5] | |
# objectives_list = [1, 2, 3, 5] | |
problem = RMFS(objectives_list) # 参数为目标值 1-5 | |
print("n_pod: ", problem.number_of_variables) | |
max_evaluations = 10000 | |
algorithm = MOEAD( | |
problem=problem, | |
population_size=2500, | |
crossover=PMXCrossover(0.8), # noqa | |
# crossover=DifferentialEvolutionCrossover(CR=1.0, F=0.5), | |
# mutation=PolynomialMutation(probability=1.0 / problem.number_of_variables, distribution_index=20), | |
mutation=PermutationSwapMutation(1.0 / problem.number_of_variables), | |
aggregative_function=Tschebycheff(dimension=problem.number_of_objectives), | |
neighbor_size=20, | |
neighbourhood_selection_probability=0.9, | |
max_number_of_replaced_solutions=2, | |
weight_files_path="E:/python/RMFS/jMetalPy-main/resources/MOEAD_weights", | |
termination_criterion=StoppingByEvaluations(max_evaluations=max_evaluations), | |
) | |
algorithm.run() | |
front = get_non_dominated_solutions(algorithm.get_result()) | |
# Save results to file | |
print_function_values_to_file(front, "FUN." + algorithm.label) | |
print_variables_to_file(front, "VAR." + algorithm.label) | |
if 2 <= len(objectives_list) <= 3: | |
plot_front = Plot(title='Pareto front approximation', axis_labels=['x', 'y']) | |
plot_front.plot(front, label='NSGAII-ZDT1') | |
print(f"Algorithm: {algorithm.get_name()}") | |
print(f"Problem: {problem.get_name()}") | |
if len(objectives_list) == 1: | |
print("Solution: {}".format(front[0].variables)) | |
print("Fitness: {}".format(front[0].objectives[0])) | |
print(f"Computing time: {algorithm.total_computing_time}") |
import random | |
from jmetal.lab.visualization import Plot | |
from jmetal.core.problem import PermutationProblem | |
from jmetal.core.solution import PermutationSolution | |
from parameter import * | |
from functions import * | |
import numpy as np | |
from jmetal.operator.crossover import PMXCrossover | |
from jmetal.operator.mutation import PermutationSwapMutation | |
from jmetal.util.termination_criterion import StoppingByEvaluations | |
from jmetal.util.solution import ( | |
get_non_dominated_solutions, | |
print_function_values_to_file, | |
print_variables_to_file, | |
) | |
from jmetal.algorithm.multiobjective.spea2 import SPEA2 | |
""" | |
Program to configure and run the NSGA-II algorithm configured with standard settings. | |
""" | |
class RMFS(PermutationProblem): | |
"""Class representing TSP Problem.""" | |
def __init__(self, objectives_list): | |
super(RMFS, self).__init__() | |
self.number_of_objectives = len(objectives_list) | |
self.objectives_list = objectives_list | |
self.obj_directions = [self.MINIMIZE for _ in range(self.number_of_objectives)] | |
self.number_of_variables = n_pod | |
self.number_of_constraints = 0 | |
def evaluate(self, solution: PermutationSolution) -> PermutationSolution: | |
x = np.zeros((n_pod, n_robot, n_pod), dtype=int) # 货架 i 是机器人 r 第 k 个转运的任务(0,1 变量) | |
tP = -1 * np.ones(n_pod) # 货架 i 被机器人拿起的时间,-1 表示未搬运货架 | |
tD = -1 * np.ones(n_pod) # 货架 i 被机器人放下的时间,-1 表示未搬运货架 | |
robot_condition = np.zeros(n_robot) # 每个机器人的工作状态(即完成当前工作时间) | |
n_robot_work = np.zeros(n_robot, dtype=int) # 每个机器人的工作数量 | |
selected_pods = get_selected_pods(A, n_commodity_type, n_pod, B, solution.variables) | |
for i in range(len(selected_pods)): | |
selected_robot = select_robot(robot_condition) | |
tP[selected_pods[i]] = robot_condition[selected_robot] + Tp[selected_pods[i]] | |
tD[selected_pods[i]] = tP[selected_pods[i]] + Td[selected_pods[i]] | |
robot_condition[selected_robot] = robot_condition[selected_robot] + Tp[selected_pods[i]] + Td[selected_pods[i]] | |
x[selected_pods[i], selected_robot, n_robot_work[selected_robot]] = 1 | |
n_robot_work[selected_robot] = n_robot_work[selected_robot] + 1 | |
y = get_y(tD, selected_pods) | |
z = get_z(y, np.array(selected_pods), np.array(A), np.array(B)) | |
tH = get_tH(y, z, selected_pods, tD, Th) | |
tO = get_tO(y, z, np.array(selected_pods), B, Th, tH) | |
target = [round(max(tH + Th * np.sum(np.sum(z, axis=1), axis=1)), decimal_Th), | |
round(float(np.sum(tH + Th * np.sum(np.sum(z, axis=1), axis=1))), decimal_Th), np.sum(y), | |
round(g_target4(Tp, Td, x), decimal_Th), round(sum(tO), decimal_Th)] | |
# if not judge_constraint (x, y, z, tP, tD, tH, selected_pods): # 判断是否符合约束条件 | |
# raise Exception ("不满足约束条件") | |
for i, objective in enumerate(self.objectives_list): | |
solution.objectives[i] = target[objective - 1] | |
return solution | |
def create_solution(self) -> PermutationSolution: | |
new_solution = PermutationSolution( | |
number_of_variables=self.number_of_variables, number_of_objectives=self.number_of_objectives | |
) | |
new_solution.variables = random.sample(range(self.number_of_variables), k=self.number_of_variables) | |
return new_solution | |
@property | |
def number_of_cities(self): | |
return self.number_of_variables | |
def get_name(self): | |
return "RMFS" | |
if __name__ == "__main__": | |
objectives_list = [1, 5] | |
problem = RMFS(objectives_list) # 参数为目标值 1-5 | |
print("n_pod: ", problem.number_of_variables) | |
max_evaluations = 2000 | |
algorithm = SPEA2( | |
problem=problem, | |
population_size=40, | |
offspring_population_size=40, | |
mutation=PermutationSwapMutation(1.0 / problem.number_of_variables), | |
crossover=PMXCrossover(0.8), | |
termination_criterion=StoppingByEvaluations(max_evaluations=max_evaluations), | |
) | |
algorithm.run() | |
front = get_non_dominated_solutions(algorithm.get_result()) | |
# Save results to file | |
print_function_values_to_file(front, "FUN." + algorithm.label) | |
print_variables_to_file(front, "VAR." + algorithm.label) | |
if 2 <= len(objectives_list) <= 3: | |
plot_front = Plot(title='Pareto front approximation', axis_labels=['x', 'y']) | |
plot_front.plot(front, label='NSGAII-ZDT1') | |
print(f"Algorithm: {algorithm.get_name()}") | |
print(f"Problem: {problem.get_name()}") | |
if len(objectives_list) == 1: | |
print("Solution: {}".format(front[0].variables)) | |
print("Fitness: {}".format(front[0].objectives[0])) | |
print(f"Computing time: {algorithm.total_computing_time}") |