其他
2019 年 Dota2 比赛中,AI 战胜世界冠军的最强算法在这里!
点击左上方蓝字关注我们
PG算法介绍
Actor:由智能体Agent产生的执行动作; Env:智能体Agent的执行环境; Reward Function:奖励计算方式,执行动作的评价指标。
采用神经网络拟合策略函数,通过计算策略梯度来优化策略网络; 通过环境产生的状态State矩阵或者向量Vector作为神经网络的输入,通过神经网络得到每个执行动作的概率,选择概率最大的执行动作。
在环境Env中得到状态State,Agent通过State得出使得Reward最大的执行动作Actor; Actor在环境中又得到新的State(next state) ; 重复以上动作,直到Reward Fuction不成立或者达到这次循环终止条件。此时可以计算出一个轨迹的发生概率(一个episode的发生概率)。
关于PG的经验之谈
PPO算法介绍
On-policy:训练的agent一边互动一边学习(互动的agent就是训练的agent); Off-policy:训练的agent一边看一边学习(互动的agent不是训练的agent)。
基于飞桨PARL实践PPO算法
实践一:CartPole任务
def __init__(self, obs_dim, act_dim, init_logvar):
self.obs_dim = obs_dim
self.act_dim = act_dim
hid1_size = obs_dim * 10
hid3_size = act_dim * 10
hid2_size = int(np.sqrt(hid1_size * hid3_size))
self.lr = 9e-4 / np.sqrt(hid2_size)
self.fc1 = layers.fc(size=hid1_size, act='tanh')
self.fc2 = layers.fc(size=hid2_size, act='tanh')
self.fc3 = layers.fc(size=hid3_size, act='tanh')
self.fc4 = layers.fc(size=act_dim, act='tanh')
self.logvars = layers.create_parameter(
shape=[act_dim],
dtype='float32',
default_initializer=fluid.initializer.ConstantInitializer(
init_logvar))
def policy(self, obs): #策略
hid1 = self.fc1(obs)
hid2 = self.fc2(hid1)
hid3 = self.fc3(hid2)
means = self.fc4(hid3)
logvars = self.logvars()
return means, logvars
def sample(self, obs): #采样
means, logvars = self.policy(obs)
sampled_act = means + (
layers.exp(logvars / 2.0) * # stddev
layers.gaussian_random(shape=(self.act_dim, ), dtype='float32'))
return sampled_act
class ValueModel(parl.Model):
def __init__(self, obs_dim, act_dim):
super(ValueModel, self).__init__()
hid1_size = obs_dim * 10
hid3_size = 5
hid2_size = int(np.sqrt(hid1_size * hid3_size))
self.lr = 1e-2 / np.sqrt(hid2_size)
self.fc1 = layers.fc(size=hid1_size, act='tanh')
self.fc2 = layers.fc(size=hid2_size, act='tanh')
self.fc3 = layers.fc(size=hid3_size, act='tanh')
self.fc4 = layers.fc(size=1)
def value(self, obs):
hid1 = self.fc1(obs)
hid2 = self.fc2(hid1)
hid3 = self.fc3(hid2)
V = self.fc4(hid3)
V = layers.squeeze(V, axes=[])
return V
构建Agent:
def __init__(self,algorithm,obs_dim,act_dim,kl_targ,loss_type,beta=1.0,epsilon=0.2,policy_learn_times=20,value_learn_times=10,value_batch_size=256):
参数初始化(略)
def build_program(self): #在静态图下构建program,定义图的输入和输出
self.policy_predict_program = fluid.Program()
self.policy_sample_program = fluid.Program()
self.policy_learn_program = fluid.Program()
self.value_predict_program = fluid.Program()
self.value_learn_program = fluid.Program()
def policy_sample(self, obs):
#通过网络推理得到sample
return sampled_act
def policy_predict(self, obs):
#通过网络推理得到predict
return means
def value_predict(self, obs):
#通过网络推理得到value_predict
return value
#用ppo算法更新policy
def policy_learn(self, obs, actions, advantages):
self.alg.sync_old_policy()
all_loss, all_kl = [], []
for _ in range(self.policy_learn_times):
loss, kl = self._batch_policy_learn(obs, actions, advantages)
all_loss.append(loss)
all_kl.append(kl)
if self.loss_type == 'KLPEN':
# Adative KL penalty coefficient
if kl > self.kl_targ * 2:
self.beta = 1.5 * self.beta
elif kl < self.kl_targ / 2:
self.beta = self.beta / 1.5
return np.mean(all_loss), np.mean(all_kl)
#用ppo算法更新value
def value_learn(self, obs, value):
data_size = obs.shape[0]
if self.value_learn_buffer is None:
obs_train, value_train = obs, value
else:
obs_train = np.concatenate([obs, self.value_learn_buffer[0]])
value_train = np.concatenate([value, self.value_learn_buffer[1]])
self.value_learn_buffer = (obs, value)
all_loss = []
for _ in range(self.value_learn_times):
random_ids = np.arange(obs_train.shape[0])
np.random.shuffle(random_ids)
shuffle_obs_train = obs_train[random_ids]
shuffle_value_train = value_train[random_ids]
start = 0
while start < data_size:
end = start + self.value_batch_size
value_loss = self._batch_value_learn(
shuffle_obs_train[start:end, :],
shuffle_value_train[start:end])
all_loss.append(value_loss)
start += self.value_batch_size
return np.mean(all_loss)
def __init__(self,model,act_dim=None,policy_lr=None,value_lr=None,epsilon=0.2):
模型初始化略()
def _calc_logprob(self, actions, means, logvars):
exp_item = layers.elementwise_div(
layers.square(actions - means), layers.exp(logvars), axis=1)
exp_item = -0.5 * layers.reduce_sum(exp_item, dim=1)
vars_item = -0.5 * layers.reduce_sum(logvars)
logprob = exp_item + vars_item
return logprob
#计算KL
def _calc_kl(self, means, logvars, old_means, old_logvars):
log_det_cov_old = layers.reduce_sum(old_logvars)
log_det_cov_new = layers.reduce_sum(logvars)
tr_old_new = layers.reduce_sum(layers.exp(old_logvars - logvars))
kl = 0.5 * (layers.reduce_sum(
layers.square(means - old_means) / layers.exp(logvars), dim=1) + (
log_det_cov_new - log_det_cov_old) + tr_old_new - self.act_dim)
return kl
def policy_learn(self, obs, actions, advantages, beta=None):
old_means, old_logvars = self.old_policy_model.policy(obs)
old_means.stop_gradient = True
old_logvars.stop_gradient = True
old_logprob = self._calc_logprob(actions, old_means, old_logvars)
means, logvars = self.model.policy(obs)
logprob = self._calc_logprob(actions, means, logvars)
kl = self._calc_kl(means, logvars, old_means, old_logvars)
kl = layers.reduce_mean(kl)
env = ContinuousCartPoleEnv()
obs_dim = env.observation_space.shape[0]
act_dim = env.action_space.shape[0]
obs_dim += 1 # add 1 to obs dim for time step feature 应该是为了方便引入衰减因子
scaler = Scaler(obs_dim)
model = PPOModel(obs_dim, act_dim)
alg = parl.algorithms.PPO(
model,
act_dim=act_dim,
policy_lr=model.policy_lr,
value_lr=model.value_lr)
agent = PPOAgent(
alg, obs_dim, act_dim, kl_targ, loss_type=loss_type)
# 运行几个episode来初始化 scaler
collect_trajectories(env, agent, scaler, episodes=5)
test_flag = 0
total_steps = 0
while total_steps < train_total_steps:
trajectories = collect_trajectories(
env, agent, scaler, episodes=episodes_per_batch)
total_steps += sum([t['obs'].shape[0] for t in trajectories])
total_train_rewards = sum([np.sum(t['rewards']) for t in trajectories])
#产生训练数据
train_obs, train_actions, train_advantages, train_discount_sum_rewards = build_train_data(
trajectories, agent)
#计算policy_loss, kl
policy_loss, kl = agent.policy_learn(train_obs, train_actions,
train_advantages)
value_loss = agent.value_learn(train_obs, train_discount_sum_rewards)
if total_steps // test_every_steps >= test_flag:
while total_steps // test_every_steps >= test_flag:
test_flag += 1
eval_reward = run_evaluate_episode(env, agent, scaler)
[07-23 12:00:44 MainThread @<ipython-input-7-710321de7941>:188] Steps 1001984, Evaluate reward: 23285.0
mujoco_model.py mujoco_agent.py scaler.py train.py
文章小结
在文章开篇和大家一起学习了PG(Policy Gradient)算法是如何解决连续动作空间上求解RL, PG在更新策略时的优缺点及相应的避“坑儿”技巧,如:增加基线、合理适配action的权重、增加代价因子。 接下来介绍了强化学习常用的两种模型训练方式:On-policy与Off-policy,并引出PPO算法。 最后基于飞桨PARL框架,动手实践了PPO算法,完成了CarPole和四轴飞行器悬浮两个任务。
END
Flink-分布式的冯诺伊曼机器
以假乱真?加州伯克利分校的学生用 GPT-3 生成伪文章成功骗得 26000 访问
Mozilla 继续卧薪尝胆:这些年谷歌都对火狐做了什么?
Docker 禁止美国“实体清单”主体使用,Docker 开源项目不受影响美国如果把根域名服务器封了,中国会从网络上消失?
觉得不错,请点个在看呀