【他山之石】联邦学习:FedAvg 的 Pytorch 实现
“他山之石,可以攻玉”,站在巨人的肩膀才能看得更高,走得更远。在科研的道路上,更需借助东风才能更快前行。为此,我们特别搜集整理了一些实用的代码链接,数据集,软件,编程技巧等,开辟“他山之石”专栏,助你乘风破浪,一路奋勇向前,敬请关注。
地址:https://www.zhihu.com/people/qu-xiang-mou
在 Fedrated Learning 中,每个客户数据都分散地在本地训练其模型,仅将学习到的模型参数发送到受信任的 Server,通过差分隐私加密和安全聚合等技术得到主模型。然后,受信任的 Server 将聚合的主模型发回给这些客户端,并重复此过程。
在这种情况下,准备了一个具有 IID(独立同分布)数据的简单实现,以演示如何将在不同节点上运行的数百个不同模型的参数与 FedAvg 方法结合使用,以及该模型是否会给出合理的结果。此实现是在 MNIST 数据集上执行的。MNIST 数据集包含数量为 0 到 9 的 28 * 28 像素灰度图像。
01
FedAvg 训练过程
由于主模型的参数和节点中所有局部模型的参数都是随机初始化的,所有这些参数将彼此不同。因此,在对节点中的本地模型进行训练之前,主模型会将模型参数发送给节点。
节点使用这些参数在其自身的数据上训练本地模型。
每个节点在训练自己的模型时都会更新其参数。训练过程完成后,每个节点会将其参数发送到主模型。
主模型采用这些参数的平均值并将其设置为新的权重参数,并将其传递回节点以进行下一次迭代。
02
函数介绍
函数介绍
split_and_shuffle_labels(y_data,seed,amount): 数据集的每个标签包含的样本量不相等,为了保证数据作为 IID 分发到节点,必须采用相等数量的数据。
该函数保证数据数量相同,并在其内部随机排序。(此处改组的是数据索引,以后我们将在检索数据时使用该索引)。
def split_and_shuffle_labels(y_data, seed, amount):
y_data=pd.DataFrame(y_data,columns=["labels"])
y_data["i"]=np.arange(len(y_data))
label_dict = dict()
for i in range(10):
var_name="label" + str(i)
label_info=y_data[y_data["labels"]==i]
np.random.seed(seed)
label_info=np.random.permutation(label_info)
label_info=label_info[0:amount]
label_info=pd.DataFrame(label_info, columns=["labels","i"])
label_dict.update({var_name: label_info })
return label_dict
get_iid_subsamples_indices(label_dict,number_of_samples,数量):均分每个索引,保证每个节点中各标签数量相等。
def get_iid_subsamples_indices(label_dict, number_of_samples, amount):
sample_dict= dict()
batch_size=int(math.floor(amount/number_of_samples))
for i in range(number_of_samples):
sample_name="sample"+str(i)
dumb=pd.DataFrame()
for j in range(10):
label_name=str("label")+str(j)
a=label_dict[label_name][i*batch_size:(i+1)*batch_size]
dumb=pd.concat([dumb,a], axis=0)
dumb.reset_index(drop=True, inplace=True)
sample_dict.update({sample_name: dumb})
return sample_dict
create_iid_subsamples(sample_dict, x_data, y_data, x_name, y_name) : 将数据分发到节点。
def create_iid_subsamples(sample_dict, x_data, y_data, x_name, y_name):
x_data_dict= dict()
y_data_dict= dict()
for i in range(len(sample_dict)): ### len(sample_dict)= number of samples
xname= x_name+str(i)
yname= y_name+str(i)
sample_name="sample"+str(i)
indices=np.sort(np.array(sample_dict[sample_name]["i"]))
x_info= x_data[indices,:]
x_data_dict.update({xname : x_info})
y_info= y_data[indices]
y_data_dict.update({yname : y_info})
return x_data_dict, y_data_dict
Functions for FedAvg
create_model_optimizer_criterion_dict(number_of_samples) : 创建模型,优化器和损失函数。
def create_model_optimizer_criterion_dict(number_of_samples):
model_dict = dict()
optimizer_dict= dict()
criterion_dict = dict()
for i in range(number_of_samples):
model_name="model"+str(i)
model_info=Net2nn()
model_dict.update({model_name : model_info })
optimizer_name="optimizer"+str(i)
optimizer_info = torch.optim.SGD(model_info.parameters(), lr=learning_rate, momentum=momentum)
optimizer_dict.update({optimizer_name : optimizer_info })
criterion_name = "criterion"+str(i)
criterion_info = nn.CrossEntropyLoss()
criterion_dict.update({criterion_name : criterion_info})
return model_dict, optimizer_dict, criterion_dict
get_averaged_weights(model_dict, number_of_samples) : 获取各个节点中权重的平均值。
def get_averaged_weights(model_dict, number_of_samples):
fc1_mean_weight = torch.zeros(size=model_dict[name_of_models[0]].fc1.weight.shape)
fc1_mean_bias = torch.zeros(size=model_dict[name_of_models[0]].fc1.bias.shape)
fc2_mean_weight = torch.zeros(size=model_dict[name_of_models[0]].fc2.weight.shape)
fc2_mean_bias = torch.zeros(size=model_dict[name_of_models[0]].fc2.bias.shape)
fc3_mean_weight = torch.zeros(size=model_dict[name_of_models[0]].fc3.weight.shape)
fc3_mean_bias = torch.zeros(size=model_dict[name_of_models[0]].fc3.bias.shape)
with torch.no_grad():
for i in range(number_of_samples):
fc1_mean_weight += model_dict[name_of_models[i]].fc1.weight.data.clone()
fc1_mean_bias += model_dict[name_of_models[i]].fc1.bias.data.clone()
fc2_mean_weight += model_dict[name_of_models[i]].fc2.weight.data.clone()
fc2_mean_bias += model_dict[name_of_models[i]].fc2.bias.data.clone()
fc3_mean_weight += model_dict[name_of_models[i]].fc3.weight.data.clone()
fc3_mean_bias += model_dict[name_of_models[i]].fc3.bias.data.clone()
fc1_mean_weight =fc1_mean_weight/number_of_samples
fc1_mean_bias = fc1_mean_bias/ number_of_samples
fc2_mean_weight =fc2_mean_weight/number_of_samples
fc2_mean_bias = fc2_mean_bias/ number_of_samples
fc3_mean_weight =fc3_mean_weight/number_of_samples
fc3_mean_bias = fc3_mean_bias/ number_of_samples
return fc1_mean_weight, fc1_mean_bias, fc2_mean_weight, fc2_mean_bias, fc3_mean_weight, fc3_mean_bias
set_averaged_weights_as_main_model_weights_and_update_main_model(main_model,model_dict, number_of_samples) : 将各个节点的平均权重发送到主模型,并将它们设置为主模型的新权重。
def set_averaged_weights_as_main_model_weights_and_update_main_model(main_model,model_dict, number_of_samples):
fc1_mean_weight, fc1_mean_bias, fc2_mean_weight, fc2_mean_bias, fc3_mean_weight, fc3_mean_bias = get_averaged_weights(model_dict, number_of_samples=number_of_samples)
with torch.no_grad():
main_model.fc1.weight.data = fc1_mean_weight.data.clone()
main_model.fc2.weight.data = fc2_mean_weight.data.clone()
main_model.fc3.weight.data = fc3_mean_weight.data.clone()
main_model.fc1.bias.data = fc1_mean_bias.data.clone()
main_model.fc2.bias.data = fc2_mean_bias.data.clone()
main_model.fc3.bias.data = fc3_mean_bias.data.clone()
return main_model
compare_local_and_merged_model_performance(number_of_samples : 比较主模型和在各节点运行的本地模型的准确性。
def compare_local_and_merged_model_performance(number_of_samples):
accuracy_table=pd.DataFrame(data=np.zeros((number_of_samples,3)), columns=["sample", "local_ind_model", "merged_main_model"])
for i in range (number_of_samples):
test_ds = TensorDataset(x_test_dict[name_of_x_test_sets[i]], y_test_dict[name_of_y_test_sets[i]])
test_dl = DataLoader(test_ds, batch_size=batch_size * 2)
model=model_dict[name_of_models[i]]
criterion=criterion_dict[name_of_criterions[i]]
optimizer=optimizer_dict[name_of_optimizers[i]]
individual_loss, individual_accuracy = validation(model, test_dl, criterion)
main_loss, main_accuracy =validation(main_model, test_dl, main_criterion )
accuracy_table.loc[i, "sample"]="sample "+str(i)
accuracy_table.loc[i, "local_ind_model"] = individual_accuracy
accuracy_table.loc[i, "merged_main_model"] = main_accuracy
return accuracy_table
send_main_model_to_nodes_and_update_model_dict(main_model, model_dict, number_of_samples) : 将主模型的参数发送到各节点。
def send_main_model_to_nodes_and_update_model_dict(main_model, model_dict, number_of_samples):
with torch.no_grad():
for i in range(number_of_samples):
model_dict[name_of_models[i]].fc1.weight.data =main_model.fc1.weight.data.clone()
model_dict[name_of_models[i]].fc2.weight.data =main_model.fc2.weight.data.clone()
model_dict[name_of_models[i]].fc3.weight.data =main_model.fc3.weight.data.clone()
model_dict[name_of_models[i]].fc1.bias.data =main_model.fc1.bias.data.clone()
model_dict[name_of_models[i]].fc2.bias.data =main_model.fc2.bias.data.clone()
model_dict[name_of_models[i]].fc3.bias.data =main_model.fc3.bias.data.clone()
return model_dict
start_train_end_node_process_without_print() : 在节点中训练各个局部模型。
def start_train_end_node_process_without_print(number_of_samples):
for i in range (number_of_samples):
train_ds = TensorDataset(x_train_dict[name_of_x_train_sets[i]], y_train_dict[name_of_y_train_sets[i]])
train_dl = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
test_ds = TensorDataset(x_test_dict[name_of_x_test_sets[i]], y_test_dict[name_of_y_test_sets[i]])
test_dl = DataLoader(test_ds, batch_size= batch_size * 2)
model=model_dict[name_of_models[i]]
criterion=criterion_dict[name_of_criterions[i]]
optimizer=optimizer_dict[name_of_optimizers[i]]
for epoch in range(numEpoch):
train_loss, train_accuracy = train(model, train_dl, criterion, optimizer)
test_loss, test_accuracy = validation(model, test_dl, criterion)
03
基于 centralized-data 模型的性能如何?
centralized_model = Net2nn()
centralized_optimizer = torch.optim.SGD(centralized_model.parameters(), lr=0.01, momentum=0.9)
centralized_criterion = nn.CrossEntropyLoss()
train_ds = TensorDataset(x_train, y_train)
train_dl = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
valid_ds = TensorDataset(x_valid, y_valid)
valid_dl = DataLoader(valid_ds, batch_size=batch_size * 2)
test_ds = TensorDataset(x_test, y_test)
test_dl = DataLoader(test_ds, batch_size=batch_size * 2)
print("------ Centralized Model ------")
for epoch in range(numEpoch):
central_train_loss, central_train_accuracy = train(centralized_model, train_dl, centralized_criterion, centralized_optimizer)
central_test_loss, central_test_accuracy = validation(centralized_model, test_dl, centralized_criterion)
print("epoch: {:3.0f}".format(epoch+1) + " | train accuracy: {:7.4f}".format(central_train_accuracy) + " | test accuracy: {:7.4f}".format(central_test_accuracy))
— — — Centralized Model — — —
epoch: 1 | train accuracy: 0.8743 | test accuracy: 0.9437
epoch: 2 | train accuracy: 0.9567 | test accuracy: 0.9654
epoch: 3 | train accuracy: 0.9712 | test accuracy: 0.9701
epoch: 4 | train accuracy: 0.9785 | test accuracy: 0.9738
epoch: 5 | train accuracy: 0.9834 | test accuracy: 0.9713
epoch: 6 | train accuracy: 0.9864 | test accuracy: 0.9768
epoch: 7 | train accuracy: 0.9898 | test accuracy: 0.9763
epoch: 8 | train accuracy: 0.9923 | test accuracy: 0.9804
epoch: 9 | train accuracy: 0.9941 | test accuracy: 0.9784
epoch: 10 | train accuracy: 0.9959 | test accuracy: 0.9792
— — — Training finished — — -
此处的目的是比较通过将在自己的数据上训练的局部模型的参数与在所有训练数据上训练的集中式模型相结合而形成的主模型的性能。
将数据分发到节点
label_dict_train=split_and_shuffle_labels(y_data=y_train, seed=1, amount=train_amount)
sample_dict_train=get_iid_subsamples_indices(label_dict=label_dict_train, number_of_samples=number_of_samples, amount=train_amount)
x_train_dict, y_train_dict = create_iid_subsamples(sample_dict=sample_dict_train, x_data=x_train, y_data=y_train, x_name="x_train", y_name="y_train")
label_dict_valid = split_and_shuffle_labels(y_data=y_valid, seed=1, amount=train_amount)
sample_dict_valid = get_iid_subsamples_indices(label_dict=label_dict_valid, number_of_samples=number_of_samples, amount=valid_amount)
x_valid_dict, y_valid_dict = create_iid_subsamples(sample_dict=sample_dict_valid, x_data=x_valid, y_data=y_valid, x_name="x_valid", y_name="y_valid")
label_dict_test = split_and_shuffle_labels(y_data=y_test, seed=1, amount=test_amount)
sample_dict_test = get_iid_subsamples_indices(label_dict=label_dict_test, number_of_samples=number_of_samples, amount=test_amount)
x_test_dict, y_test_dict = create_iid_subsamples(sample_dict=sample_dict_test, x_data=x_test, y_data=y_test, x_name="x_test", y_name="y_test")
创建主模型
main_model = Net2nn()
main_optimizer = torch.optim.SGD(main_model.parameters(), lr=learning_rate, momentum=0.9)
main_criterion = nn.CrossEntropyLoss()
定义节点中的模型,优化器和损失函数
model_dict, optimizer_dict, criterion_dict = create_model_optimizer_criterion_dict(number_of_samples)
name_of_x_train_sets=list(x_train_dict.keys())
name_of_y_train_sets=list(y_train_dict.keys())
name_of_x_valid_sets=list(x_valid_dict.keys())
name_of_y_valid_sets=list(y_valid_dict.keys())
name_of_x_test_sets=list(x_test_dict.keys())
name_of_y_test_sets=list(y_test_dict.keys())
name_of_models=list(model_dict.keys())
name_of_optimizers=list(optimizer_dict.keys())
name_of_criterions=list(criterion_dict.keys())
主模型的参数发送到节点
由于主模型的参数和节点中所有本地模型的参数都是随机初始化的,因此所有这些参数将彼此不同。因此,在对节点中的本地模型进行训练之前,主模型会将其参数发送给节点。
model_dict=send_main_model_to_nodes_and_update_model_dict(main_model, model_dict, number_of_samples)
start_train_end_node_process_without_print(number_of_samples)
before_test_loss, before_test_accuracy = validation(main_model, test_dl, main_criterion)
main_model= set_averaged_weights_as_main_model_weights_and_update_main_model(main_model,model_dict, number_of_samples)
after_test_loss, after_test_accuracy = validation(main_model, test_dl, main_criterion)
print("Before 1st iteration main model accuracy on all test data: {:7.4f}".format(before_test_accuracy))
print("After 1st iteration main model accuracy on all test data: {:7.4f}".format(after_test_accuracy))
print("Centralized model accuracy on all test data: {:7.4f}".format(central_test_accuracy))
Before 1st iteration main model accuracy on all test data: 0.1180
After 1st iteration main model accuracy on all test data: 0.8529
Centralized model accuracy on all test data: 0.9790
当重复迭代10次以上时主模型的性能:
for i in range(10):
model_dict=send_main_model_to_nodes_and_update_model_dict(main_model, model_dict, number_of_samples)
start_train_end_node_process_without_print(number_of_samples)
main_model= set_averaged_weights_as_main_model_weights_and_update_main_model(main_model,model_dict, number_of_samples)
test_loss, test_accuracy = validation(main_model, test_dl, main_criterion)
print("Iteration", str(i+2), ": main_model accuracy on all test data: {:7.4f}".format(test_accuracy))
Iteration 2 : main_model accuracy on all test data: 0.8928
Iteration 3 : main_model accuracy on all test data: 0.9073
Iteration 4 : main_model accuracy on all test data: 0.9150
Iteration 5 : main_model accuracy on all test data: 0.9209
Iteration 6 : main_model accuracy on all test data: 0.9273
Iteration 7 : main_model accuracy on all test data: 0.9321
Iteration 8 : main_model accuracy on all test data: 0.9358
Iteration 9 : main_model accuracy on all test data: 0.9382
Iteration 10 : main_model accuracy on all test data: 0.9411
Iteration 11 : main_model accuracy on all test data: 0.9431
原文链接:https://towardsdatascience.com/federated-learning-a-simple-implementation-of-fedavg-federated-averaging-with-pytorch-90187c9c9577
Github 源码地址:https://github.com/eceisik/fl_public/blob/master/fedavg_mnist_iid.ipynb
本文目的在于学术交流,并不代表本公众号赞同其观点或对其内容真实性负责,版权归原作者所有,如有侵权请告知删除。
直播预告
“他山之石”历史文章
PyTorch实现ShuffleNet-v2亲身实践
详解凸优化、图神经网络、强化学习、贝叶斯方法等四大主题
史上最全!近千篇机器学习&自然语言处理论文!都这儿了
训练时显存优化技术——OP合并与gradient checkpoint
详解凸优化、图神经网络、强化学习、贝叶斯方法等四大主题
浅谈数据标准化与Pytorch中NLLLoss和CrossEntropyLoss损失函数的区别
在C++平台上部署PyTorch模型流程+踩坑实录
libtorch使用经验
系统梳理 Tensorflow、PyTorch 等深度学习框架,洞悉 AI 底层原理和算法
深度学习模型转换与部署那些事(含ONNX格式详细分析)
如何支撑上亿类别的人脸训练?显存均衡的模型并行(PyTorch实现)
NLP高阶实战必读:一文走遍完整自然语言处理流程
PyTorch trick 集锦
分享、点赞、在看,给个三连击呗!