程序员如何解决并发冲突的难题?
作者 | 羽生结弦
责编 | 胡雪蕊
出品 | CSDN(ID: CSDNnews)
在大多数的应用中都会出现客户端同时发送多个请求对同一条数据就行修改,这个时候就会出现并发冲突。我们一般的做法会有如下两种:
1. 乐观并发
所谓的乐观并发就是多个请求同时对同一条数据的更新,只有最后一个更新请求会被保存,其他更新请求将会被抛弃。
2. 悲观并发
所谓悲观并发就是多个请求同时对同一条数据的更新,只有当前更新请求完成或者被抛弃,才会执行下一个更新请求,如果当前更新请求未完成或者未被抛弃,那么后面所有的更新请求将会被阻塞。
通过上面的简单讲解我们简单的了解了如何处理并发请求,那么下面我们来看一下上面两种做法的具体讲解和实现。
方法一
在 Entity Framework 中,默认的解决方案是乐观并发,原因是当出现并发情况的时候,内部没有任何对其他客户端访问同一行数据的限制。我们来看一下例子,我们在数据库中存有一条数据,数据如下图所示:
class Program
{
static void Main(string[] args)
{
int userId = 1;
using (var db = new EfContext())
{
using (var ef = new EfContext())
{
User user1 = db.Users.FirstOrDefault(p => p.Id == userId);
User user2= ef.Users.FirstOrDefault(p => p.Id == userId);
user1.Name = "李四";
db.SaveChanges();
user2.Name = "王五";
ef.SaveChanges();
}
}
}
}
上述操作发生了什么呢?我们来看一下,首先我们利用 db 从数据库中读取了 id 等于1的人员信息,此时该人员信息为张三,然后我们将 Name 值改为李四,并且提交到了数据库,在这个时候,数据库中的Name值将不再是张三,而是李四。接着我们再将 user2 的 Name 值修改为王五,并提交的数据库,这个时候数据库的 Name 列的值变为了王五。上述情况下,Entity Framework 将修改转换为 update 语句时是利用主键来定位指定行,因此上面两次操作都会成功,只不过最后一次修改的数据会最终持久化到数据库中。但是这种方式存在一个巨大的隐患,例如在门票预售系统中,门票的数量是有限制的,购票人数超过门票数量限制将会禁止购买。如果利用 Entity Framework 默认的乐观并发模式,每次有并发请求购票时,每个请求都会减去门票数量,并且向数据库中插入一条购票信息,这样一来永远是最后一个请求的数据会持久化到数据库中,这样就造成了门票预约人数超过了门票的限制数量。
针对上面所说的问题,我么可以利用如下两种方式来解决:
1. 并发 Token
Property(p => p.Name).IsConcurrencyToken();
通过行版本设置,我们需要为实体添加一个行版本子字节数组,代码如下:
public byte[] RowVersion { get; set; }
Property(p => p.RowVersion).IsRowVersion();
class Program
{
static void Main(string[] args)
{
int userId = 1;
using (var db = new EfContext())
{
using (var ef = new EfContext())
{
User user1 = db.Users.FirstOrDefault(p => p.Id == userId);
User user2= ef.Users.FirstOrDefault(p => p.Id == userId);
user1.Name = "李四";
db.SaveChanges();
try
{
user2.Name = "王五";
ef.SaveChanges();
}
catch (DbUpdateConcurrencyException e)
{
foreach (var item in e.Entries)
{
item.Reload();
ef.SaveChanges();
}
}
}
}
}
}
try
{
//more code
}
catch (DbUpdateConcurrencyException e)
{
foreach (var item in e.Entries)
{
//原始值
var ov = item.OriginalValues.ToObject();
//更新后数据库值
var dv = item.GetDatabaseValues().ToObject();
// 当前值
var nv = item.CurrentValues.ToObject();
}
}
try
{
//more code
}
catch (DbUpdateConcurrencyException e)
{
foreach (var item in e.Entries)
{
Object dv = item.GetDatabaseValues().ToObject();
item.OriginalValues.SetValues(dv);
ef.SaveChanges();
}
}
方法二
上一小节中我们提到了客户端获胜、数据库获胜以及数据库和客户端合并获胜,并且讲解了原始值和更新后的数据库值以及当前值从哪里获得的。在这一节将利用客户端获胜、数据库获胜以及客户端和数据库合并获胜处理并发的方法。
1. 客户端获胜
当调用 SaveChanges 方法时,如果存在并发冲突将会引发 DbUpdateConcurrencyException 异常,那么这个时候我们将调用 handleDbUpdateConcurrencyException 函数来处理异常并正确解决冲突,最后在调用 SaveChanges 方法重试提交数据。如果依然排除 DbUpdateConcurrencyException 异常,将不在进行处理。我们来看以下代码:
class Program
{
static void Main(string[] args)
{
int userId = 1;
using (var db = new EfContext())
{
using (var ef = new EfContext())
{
User user1 = db.Users.FirstOrDefault(p => p.Id == userId);
User user2 = ef.Users.FirstOrDefault(p => p.Id == userId);
user1.Name = "李四";
db.SaveChanges();
try
{
user2.Name = "王五";
ef.SaveChanges();
}
catch (DbUpdateConcurrencyException e)
{
Retry(ef, handleDbUpdateConcurrencyException: exception =>
{
exception = (e as DbUpdateConcurrencyException).Entries;
foreach (var item in exception)
{
item.OriginalValues.
SetValues(item.GetDatabaseValues());
}
});
}
}
}
}
}
class Program
{
static void Main(string[] args)
{
int userId = 1;
using (var db = new EfContext())
{
using (var ef = new EfContext())
{
User user1 = db.Users.FirstOrDefault(p => p.Id == userId);
User user2 = ef.Users.FirstOrDefault(p => p.Id == userId);
user1.Name = "李四";
db.SaveChanges();
try
{
user2.Name = "王五";
ef.SaveChanges();
}
catch (DbUpdateConcurrencyException e)
{
return;
}
}
}
}
}
上面代码运行后,只有李四会被更新到数据库中,王五因为并发冲突且异常捕获后没有进行任何处理而不会存入数据库。
3. 数据库和客户端合并获胜
这种方式是最复杂的,需要合并数据库和客户端的数据,如果用到此方法我们需要谨记如下两点:
如果原始值与数据库中的值不通,就说明数据库中的值已经被其他客户端更新,这时必须放弃当前的更新,保留数据库的更新;
如果原始值与数据库的值相同,代表不会发生并发冲突,按照正常处理流程处理即可。
同样,我们将上面的例子按照上面两点进行修改:
class Program
{
static void Main(string[] args)
{
int userId = 1;
using (var db = new EfContext())
{
using (var ef = new EfContext())
{
User user1 = db.Users.FirstOrDefault(p => p.Id == userId);
User user2 = ef.Users.FirstOrDefault(p => p.Id == userId);
user1.Name = "李四";
db.SaveChanges();
try
{
user2.Name = "王五";
ef.SaveChanges();
}
catch (DbUpdateConcurrencyException e)
{
Retry(ef, handleDbUpdateConcurrencyException: exception =>
{
exception = (e as DbUpdateConcurrencyException).Entries;
foreach (var item in exception)
{
Object dv = item.GetDatabaseValues();
Object ov = item.OriginalValues();
item.OriginalValues.SetValues(dv);
dv.PropertyNames.Where(property =>
!object.Equals(ov[property], dv[property])).ToList().ForEach(property =>
item.Property(property).IsModified = false);
}
});
}
}
}
}
}
前面两种方法都是利用 SaveChanges 捕获并发异常,其实我们也可以自定义 SaveChanges 的扩展方法来处理并发异常。下面我们就来看一下具体的两种策略。
1. 普通策略
这个策略非常简单,就是利用循环来实现重试机制,代码如下:
public static partial class DbContextExtensions
{
public static int SaveChanges(this DbContext dbContext, Action<IEnumerable<DbEntityEntry>> action,
int retryCount = 3)
{
if (retryCount <= 0)
{
throw new ArgumentOutOfRangeException(nameof(retryCount), $"{retryCount}必须大于0");
}
for (int retry=1;retry<retryCount;retry++)
{
try
{
}
catch (DbUpdateConcurrencyException e) when (retry < retryCount)
{
resolveConficts(e.Entries);
}
}
return dbContext.SaveChanges();
}
}
2. 高级策略
在 .NET 中已经有开发人员帮我们开发出了强大的工具 Polly ,Polly 是一个 .NET 弹性和瞬态故障处理库,允许开发人员以 Fluent 和线程安全的方式来实现重试、断路、超时、隔离和回退策略。
首先我们需要定义一个枚举类型
csharp
public enum RefreshConflict
{
StoreWins,
ClientWins,
MergeClientAndStore
}
然后根据不同的获胜模式来刷新数据库的值
public static class RefreshEFStateExtensions
{
public static EntityEntry Refresh(this EntityEntry tracking,
RefreshConflict refreshMode)
{
switch (refreshMode)
{
case RefreshConflict.StoreWins:
{
tracking.Reload();
break;
}
case RefreshConflict.ClientWins:
{
PropertyValues databaseValues = tracking.GetDatabaseValues();
if (databaseValues == null)
{
tracking.State = EntityState.Detached;
}
else
{
tracking.OriginalValues.SetValues(databaseValues);
}
break;
}
case RefreshConflict.MergeClientAndStore:
{
PropertyValues databaseValues = tracking.GetDatabaseValues();
if (databaseValues == null)
{
tracking.State = EntityState.Detached;
}
else
{
//当实体被更新时,刷新数据库原始值
PropertyValues originalValues = tracking.OriginalValues.Clone();
tracking.OriginalValues.SetValues(databaseValues);
//如果数据库中对于属性有不同的值保留数据库中的值
databaseValues.PropertyNames // Navigation properties are not included.
.Where(property => !object.Equals(originalValues[property], databaseValues[property]))
.ForEach(property => tracking.Property(property).IsModified = false);
databaseValues.Properties
.Where(property => !object.Equals(originalValues[property.Name],
databaseValues[property.Name]))
.ToList()
.ForEach(property =>
tracking.Property(property.Name).IsModified = false);
}
break;
}
}
return tracking;
}
}
最后定义刷新状态的方法
public static partial class DbContextExtensions
{
public static int SaveChanges(this DbContext context, RefreshConflict refreshMode, int retryCount = 3)
{
if (retryCount <= 0)
{
throw new ArgumentOutOfRangeException(nameof(retryCount), $"{retryCount}必须大于0");
}
return context.SaveChanges(
conflicts => conflicts.ToList().ForEach(tracking => tracking.Refresh(refreshMode)), retryCount);
}
public static int SaveChanges(
this DbContext context, RefreshConflict refreshMode, RetryStrategy retryStrategy) =>
context.SaveChanges(
conflicts => conflicts.ToList().ForEach(tracking => tracking.Refresh(refreshMode)), retryStrategy);
}
【END】
热 文 推 荐
☞Google 搜索点击量不到 50%?
☞99年少年12岁时买下100枚比特币, 如今却将所有积蓄压在一个不知名的代币上,还放话将超越Libra!