.NET Core MongoDB数据仓储和工作单元模式封装
前言
上一章我们把系统所需要的MongoDB集合设计好了,这一章我们的主要任务是使用.NET Core应用程序连接MongoDB并且封装MongoDB数据仓储和工作单元模式,因为本章内容涵盖的有点多关于仓储和工作单元的使用就放到下一章节中讲解了。仓储模式(Repository )带来的好处是一套代码可以适用于多个类,把常用的CRUD通用方法抽象出来通过接口形式集中管理,从而解除业务逻辑层与数据访问层之间的耦合,使业务逻辑层在存储、访问数据库时无须关心数据的来源及存储方式。工作单元模式(UnitOfWork)它是用来维护一个由已经被业务修改(如增加、删除和更新等)的业务对象组成的列表,跨多个请求的业务,统一管理事务,统一提交从而保障事物一致性的作用。
MongoDB从入门到实战的相关教程
MongoDB从入门到实战之Docker快速安装MongoDB👉
MongoDB从入门到实战之MongoDB工作常用操作命令👉
MongoDB从入门到实战之.NET Core使用MongoDB开发ToDoList系统(1)-后端项目框架搭建👉
MongoDB从入门到实战之.NET Core使用MongoDB开发ToDoList系统(2)-Swagger框架集成👉
MongoDB从入门到实战之.NET Core使用MongoDB开发ToDoList系统(3)-系统数据集合设计👉
YyFlight.ToDoList项目源码地址
欢迎各位看官老爷review,有帮助的别忘了给我个Star哦💖!!!
GitHub地址:https://github.com/YSGStudyHards/YyFlight.ToDoList
MongoRepository地址:https://github.com/YSGStudyHards/YyFlight.ToDoList/tree/main/Repository/Repository
MongoDB事务使用前提说明
参阅MongoDB的事务
说明:
MongoDB单机服务器不支持事务【使用MongoDB事务会报错:Standalone servers do not support transactions】,只有在集群情况下才支持事务,因为博主接下来都是在单机环境下操作,所以无法来演示Mongo事务操作,但是方法都已经是封装好了的,大家可以自己搭建集群实操。
原因:
MongoDB在使用分布式事务时需要进行多节点之间的协调和通信,而单机环境下无法实现这样的分布式协调和通信机制。但是,在MongoDB部署为一个集群(cluster)后,将多个计算机连接为一个整体,通过协调和通信机制实现了分布式事务的正常使用。从数据一致性和可靠性的角度来看,在分布式系统中实现事务处理是至关重要的。而在单机环境下不支持事务,只有在集群情况下才支持事务的设计方式是为了保证数据一致性和可靠性,并且也符合分布式系统的设计思想。
MongoDB.Driver驱动安装
1、直接命令自动安装
Install-Package MongoDB.Driver
2、搜索Nuget手动安装
MongoSettings数据库连接配置
前往appsettings.json文件中配置Mongo数据库信息:
"MongoSettings": {
"Connection": "mongodb://root:123456@local:27017/yyflight_todolist?authSource=admin", //MongoDB连接字符串
"DatabaseName": "yyflight_todolist" //MongoDB数据库名称
}
定义Mongo DBContext上下文
现在我们将定义MongoDB DBContext上下文类,具体到一个业务对象或需要被持久化的对象,这个上下文类将封装数据库的连接和集合。
该类应负责建立与所需数据库的连接,在建立连接后,该类将在内存中或按请求持有数据库上下文(基于API管道中配置的生命周期管理。)
定义IMongoContext接口
public interface IMongoContext : IDisposable
{
/// <summary>
/// 添加命令操作
/// </summary>
/// <param name="func"></param>
/// <returns></returns>
Task AddCommandAsync(Func<Task> func);
/// <summary>
/// 保存更改
/// </summary>
/// <returns></returns>
Task<int> SaveChangesAsync();
/// <summary>
/// 获取集合数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="name"></param>
/// <returns></returns>
IMongoCollection<T> GetCollection<T>(string name);
}
定义MongoContext类
public class MongoContext : IMongoContext
{
private IMongoDatabase _database;
private MongoClient _mongoClient;
private readonly IConfiguration _configuration;
private readonly List<Func<Task>> _commands;
public IClientSessionHandle? Session = null;
public MongoContext(IConfiguration configuration)
{
_configuration = configuration;
// Every command will be stored and it'll be processed at SaveChanges
_commands = new List<Func<Task>>();
// Configure mongo (You can inject the config, just to simplify)
_mongoClient = new MongoClient(_configuration["MongoSettings:Connection"]);
_database = _mongoClient.GetDatabase(_configuration["MongoSettings:DatabaseName"]);
}
/// <summary>
/// 添加命令操作
/// </summary>
/// <param name="func">委托</param>
/// <returns></returns>
public async Task AddCommandAsync(Func<Task> func)
{
_commands.Add(func);
await Task.CompletedTask;
}
/// <summary>
/// 保存更改
/// TODO:MongoDB单机服务器不支持事务【使用MongoDB事务会报错:Standalone servers do not support transactions】,只有在集群情况下才支持事务
/// 原因:MongoDB在使用分布式事务时需要进行多节点之间的协调和通信,而单机环境下无法实现这样的分布式协调和通信机制。但是,在MongoDB部署为一个集群(cluster)后,将多个计算机连接为一个整体,通过协调和通信机制实现了分布式事务的正常使用。从数据一致性和可靠性的角度来看,在分布式系统中实现事务处理是至关重要的。而在单机环境下不支持事务,只有在集群情况下才支持事务的设计方式是为了保证数据一致性和可靠性,并且也符合分布式系统的设计思想。
/// </summary>
/// <returns></returns>
public async Task<int> SaveChangesAsync()
{
using (Session = await _mongoClient.StartSessionAsync())
{
Session.StartTransaction();
var commandTasks = _commands.Select(c => c());
await Task.WhenAll(commandTasks);
await Session.CommitTransactionAsync();
}
return _commands.Count;
}
/// <summary>
/// 获取MongoDB集合
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="name">集合名称</param>
/// <returns></returns>
public IMongoCollection<T> GetCollection<T>(string name)
{
return _database.GetCollection<T>(name);
}
/// <summary>
/// 释放上下文
/// </summary>
public void Dispose()
{
Session?.Dispose();
GC.SuppressFinalize(this);
}
}
定义通用泛型Repository
Repository(仓储)是DDD(领域驱动设计)中的经典思想,可以归纳为介于实际业务层(领域层)和数据访问层之间的层,能让领域层能在感觉不到数据访问层的情况下,完成与数据库的交互和以往的DAO(数据访问)层相比,Repository层的设计理念更偏向于面向对象,而淡化直接对数据表进行的CRUD操作。
定义IMongoRepository接口
定义一个泛型Repository通用接口,抽象常用的增加,删除,修改,查询等操作方法。
public interface IMongoRepository<T> where T : class, new()
{
#region 事务操作示例
/// <summary>
/// 事务添加数据
/// </summary>
/// <param name="objData">添加数据</param>
/// <returns></returns>
Task AddTransactionsAsync(T objData);
/// <summary>
/// 事务数据删除
/// </summary>
/// <param name="id">objectId</param>
/// <returns></returns>
Task DeleteTransactionsAsync(string id);
/// <summary>
/// 事务异步局部更新(仅更新一条记录)
/// </summary>
/// <param name="filter">过滤器</param>
/// <param name="update">更新条件</param>
/// <returns></returns>
Task UpdateTransactionsAsync(FilterDefinition<T> filter, UpdateDefinition<T> update);
#endregion
#region 添加相关操作
/// <summary>
/// 添加数据
/// </summary>
/// <param name="objData">添加数据</param>
/// <returns></returns>
Task AddAsync(T objData);
/// <summary>
/// 批量插入
/// </summary>
/// <param name="objDatas">实体集合</param>
/// <returns></returns>
Task InsertManyAsync(List<T> objDatas);
#endregion
#region 删除相关操作
/// <summary>
/// 数据删除
/// </summary>
/// <param name="id">objectId</param>
/// <returns></returns>
Task DeleteAsync(string id);
/// <summary>
/// 异步删除多条数据
/// </summary>
/// <param name="filter">删除的条件</param>
/// <returns></returns>
Task<DeleteResult> DeleteManyAsync(FilterDefinition<T> filter);
#endregion
#region 修改相关操作
/// <summary>
/// 指定对象异步修改一条数据
/// </summary>
/// <param name="obj">要修改的对象</param>
/// <param name="id">修改条件</param>
/// <returns></returns>
Task UpdateAsync(T obj, string id);
/// <summary>
/// 局部更新(仅更新一条记录)
/// <para><![CDATA[expression 参数示例:x => x.Id == 1 && x.Age > 18 && x.Gender == 0]]></para>
/// <para><![CDATA[entity 参数示例:y => new T{ RealName = "Ray", Gender = 1}]]></para>
/// </summary>
/// <param name="expression">筛选条件</param>
/// <param name="entity">更新条件</param>
/// <returns></returns>
Task UpdateAsync(Expression<Func<T, bool>> expression, Expression<Action<T>> entity);
/// <summary>
/// 异步局部更新(仅更新一条记录)
/// </summary>
/// <param name="filter">过滤器</param>
/// <param name="update">更新条件</param>
/// <returns></returns>
Task UpdateAsync(FilterDefinition<T> filter, UpdateDefinition<T> update);
/// <summary>
/// 异步局部更新(仅更新多条记录)
/// </summary>
/// <param name="expression">筛选条件</param>
/// <param name="update">更新条件</param>
/// <returns></returns>
Task UpdateManyAsync(Expression<Func<T, bool>> expression, UpdateDefinition<T> update);
/// <summary>
/// 异步批量修改数据
/// </summary>
/// <param name="dic">要修改的字段</param>
/// <param name="filter">更新条件</param>
/// <returns></returns>
Task<UpdateResult> UpdateManayAsync(Dictionary<string, string> dic, FilterDefinition<T> filter);
#endregion
#region 查询统计相关操作
/// <summary>
/// 通过ID主键获取数据
/// </summary>
/// <param name="id">objectId</param>
/// <returns></returns>
Task<T> GetByIdAsync(string id);
/// <summary>
/// 获取所有数据
/// </summary>
/// <returns></returns>
Task<IEnumerable<T>> GetAllAsync();
/// <summary>
/// 获取记录数
/// </summary>
/// <param name="expression">筛选条件</param>
/// <returns></returns>
Task<long> CountAsync(Expression<Func<T, bool>> expression);
/// <summary>
/// 获取记录数
/// </summary>
/// <param name="filter">过滤器</param>
/// <returns></returns>
Task<long> CountAsync(FilterDefinition<T> filter);
/// <summary>
/// 判断是否存在
/// </summary>
/// <param name="predicate">条件</param>
/// <returns></returns>
Task<bool> ExistsAsync(Expression<Func<T, bool>> predicate);
/// <summary>
/// 异步查询集合
/// </summary>
/// <param name="filter">查询条件</param>
/// <param name="field">要查询的字段,不写时查询全部</param>
/// <param name="sort">要排序的字段</param>
/// <returns></returns>
Task<List<T>> FindListAsync(FilterDefinition<T> filter, string[]? field = null, SortDefinition<T>? sort = null);
/// <summary>
/// 异步分页查询集合
/// </summary>
/// <param name="filter">查询条件</param>
/// <param name="pageIndex">当前页</param>
/// <param name="pageSize">页容量</param>
/// <param name="field">要查询的字段,不写时查询全部</param>
/// <param name="sort">要排序的字段</param>
/// <returns></returns>
Task<List<T>> FindListByPageAsync(FilterDefinition<T> filter, int pageIndex, int pageSize, string[]? field = null, SortDefinition<T>? sort = null);
#endregion
}
实现泛型MongoBaseRepository基类
public class MongoBaseRepository<T> : IMongoRepository<T> where T : class, new()
{
protected readonly IMongoContext _context;
protected readonly IMongoCollection<T> _dbSet;
private readonly string _collectionName;
protected MongoBaseRepository(IMongoContext context)
{
_context = context;
_collectionName = typeof(T).GetAttributeValue((TableAttribute m) => m.Name) ?? typeof(T).Name;
_dbSet = _context.GetCollection<T>(_collectionName);
}
#region 事务操作示例
/// <summary>
/// 事务添加数据
/// </summary>
/// <param name="objData">添加数据</param>
/// <returns></returns>
public async Task AddTransactionsAsync(T objData)
{
await _context.AddCommandAsync(async () => await _dbSet.InsertOneAsync(objData));
}
/// <summary>
/// 事务数据删除
/// </summary>
/// <param name="id">objectId</param>
/// <returns></returns>
public async Task DeleteTransactionsAsync(string id)
{
await _context.AddCommandAsync(() => _dbSet.DeleteOneAsync(Builders<T>.Filter.Eq(" _id ", id)));
}
/// <summary>
/// 事务异步局部更新(仅更新一条记录)
/// </summary>
/// <param name="filter">过滤器</param>
/// <param name="update">更新条件</param>
/// <returns></returns>
public async Task UpdateTransactionsAsync(FilterDefinition<T> filter, UpdateDefinition<T> update)
{
await _context.AddCommandAsync(() => _dbSet.UpdateOneAsync(filter, update));
}
#endregion
#region 添加相关操作
/// <summary>
/// 添加数据
/// </summary>
/// <param name="objData">添加数据</param>
/// <returns></returns>
public async Task AddAsync(T objData)
{
await _dbSet.InsertOneAsync(objData);
}
/// <summary>
/// 批量插入
/// </summary>
/// <param name="objDatas">实体集合</param>
/// <returns></returns>
public async Task InsertManyAsync(List<T> objDatas)
{
await _dbSet.InsertManyAsync(objDatas);
}
#endregion
#region 删除相关操作
/// <summary>
/// 数据删除
/// </summary>
/// <param name="id">objectId</param>
/// <returns></returns>
public async Task DeleteAsync(string id)
{
await _dbSet.DeleteOneAsync(Builders<T>.Filter.Eq("_id", new ObjectId(id)));
}
/// <summary>
/// 异步删除多条数据
/// </summary>
/// <param name="filter">删除的条件</param>
/// <returns></returns>
public async Task<DeleteResult> DeleteManyAsync(FilterDefinition<T> filter)
{
return await _dbSet.DeleteManyAsync(filter);
}
#endregion
#region 修改相关操作
/// <summary>
/// 指定对象异步修改一条数据
/// </summary>
/// <param name="obj">要修改的对象</param>
/// <param name="id">修改条件</param>
/// <returns></returns>
public async Task UpdateAsync(T obj, string id)
{
//修改条件
FilterDefinition<T> filter = Builders<T>.Filter.Eq("_id", new ObjectId(id));
//要修改的字段
var list = new List<UpdateDefinition<T>>();
foreach (var item in obj.GetType().GetProperties())
{
if (item.Name.ToLower() == "id") continue;
list.Add(Builders<T>.Update.Set(item.Name, item.GetValue(obj)));
}
var updatefilter = Builders<T>.Update.Combine(list);
await _dbSet.UpdateOneAsync(filter, updatefilter);
}
/// <summary>
/// 局部更新(仅更新一条记录)
/// <para><![CDATA[expression 参数示例:x => x.Id == 1 && x.Age > 18 && x.Gender == 0]]></para>
/// <para><![CDATA[entity 参数示例:y => new T{ RealName = "Ray", Gender = 1}]]></para>
/// </summary>
/// <param name="expression">筛选条件</param>
/// <param name="entity">更新条件</param>
/// <returns></returns>
public async Task UpdateAsync(Expression<Func<T, bool>> expression, Expression<Action<T>> entity)
{
var fieldList = new List<UpdateDefinition<T>>();
if (entity.Body is MemberInitExpression param)
{
foreach (var item in param.Bindings)
{
var propertyName = item.Member.Name;
object propertyValue = null;
if (item is not MemberAssignment memberAssignment) continue;
if (memberAssignment.Expression.NodeType == ExpressionType.Constant)
{
if (memberAssignment.Expression is ConstantExpression constantExpression)
propertyValue = constantExpression.Value;
}
else
{
propertyValue = Expression.Lambda(memberAssignment.Expression, null).Compile().DynamicInvoke();
}
if (propertyName != "_id") //实体键_id不允许更新
{
fieldList.Add(Builders<T>.Update.Set(propertyName, propertyValue));
}
}
}
await _dbSet.UpdateOneAsync(expression, Builders<T>.Update.Combine(fieldList));
}
/// <summary>
/// 异步局部更新(仅更新一条记录)
/// </summary>
/// <param name="filter">过滤器</param>
/// <param name="update">更新条件</param>
/// <returns></returns>
public async Task UpdateAsync(FilterDefinition<T> filter, UpdateDefinition<T> update)
{
await _dbSet.UpdateOneAsync(filter, update);
}
/// <summary>
/// 异步局部更新(仅更新多条记录)
/// </summary>
/// <param name="expression">筛选条件</param>
/// <param name="update">更新条件</param>
/// <returns></returns>
public async Task UpdateManyAsync(Expression<Func<T, bool>> expression, UpdateDefinition<T> update)
{
await _dbSet.UpdateManyAsync(expression, update);
}
/// <summary>
/// 异步批量修改数据
/// </summary>
/// <param name="dic">要修改的字段</param>
/// <param name="filter">更新条件</param>
/// <returns></returns>
public async Task<UpdateResult> UpdateManayAsync(Dictionary<string, string> dic, FilterDefinition<T> filter)
{
T t = new T();
//要修改的字段
var list = new List<UpdateDefinition<T>>();
foreach (var item in t.GetType().GetProperties())
{
if (!dic.ContainsKey(item.Name)) continue;
var value = dic[item.Name];
list.Add(Builders<T>.Update.Set(item.Name, value));
}
var updatefilter = Builders<T>.Update.Combine(list);
return await _dbSet.UpdateManyAsync(filter, updatefilter);
}
#endregion
#region 查询统计相关操作
/// <summary>
/// 通过ID主键获取数据
/// </summary>
/// <param name="id">objectId</param>
/// <returns></returns>
public async Task<T> GetByIdAsync(string id)
{
var queryData = await _dbSet.FindAsync(Builders<T>.Filter.Eq("_id", new ObjectId(id)));
return queryData.FirstOrDefault();
}
/// <summary>
/// 获取所有数据
/// </summary>
/// <returns></returns>
public async Task<IEnumerable<T>> GetAllAsync()
{
var queryAllData = await _dbSet.FindAsync(Builders<T>.Filter.Empty);
return queryAllData.ToList();
}
/// <summary>
/// 获取记录数
/// </summary>
/// <param name="expression">筛选条件</param>
/// <returns></returns>
public async Task<long> CountAsync(Expression<Func<T, bool>> expression)
{
return await _dbSet.CountDocumentsAsync(expression);
}
/// <summary>
/// 获取记录数
/// </summary>
/// <param name="filter">过滤器</param>
/// <returns></returns>
public async Task<long> CountAsync(FilterDefinition<T> filter)
{
return await _dbSet.CountDocumentsAsync(filter);
}
/// <summary>
/// 判断是否存在
/// </summary>
/// <param name="predicate">条件</param>
/// <returns></returns>
public async Task<bool> ExistsAsync(Expression<Func<T, bool>> predicate)
{
return await Task.FromResult(_dbSet.AsQueryable().Any(predicate));
}
/// <summary>
/// 异步查询集合
/// </summary>
/// <param name="filter">查询条件</param>
/// <param name="field">要查询的字段,不写时查询全部</param>
/// <param name="sort">要排序的字段</param>
/// <returns></returns>
public async Task<List<T>> FindListAsync(FilterDefinition<T> filter, string[]? field = null, SortDefinition<T>? sort = null)
{
//不指定查询字段
if (field == null || field.Length == 0)
{
if (sort == null) return await _dbSet.Find(filter).ToListAsync();
return await _dbSet.Find(filter).Sort(sort).ToListAsync();
}
//指定查询字段
var fieldList = new List<ProjectionDefinition<T>>();
for (int i = 0; i < field.Length; i++)
{
fieldList.Add(Builders<T>.Projection.Include(field[i].ToString()));
}
var projection = Builders<T>.Projection.Combine(fieldList);
fieldList?.Clear();
//不排序
if (sort == null) return await _dbSet.Find(filter).Project<T>(projection).ToListAsync();
//排序查询
return await _dbSet.Find(filter).Sort(sort).Project<T>(projection).ToListAsync();
}
/// <summary>
/// 异步分页查询集合
/// </summary>
/// <param name="filter">查询条件</param>
/// <param name="pageIndex">当前页</param>
/// <param name="pageSize">页容量</param>
/// <param name="field">要查询的字段,不写时查询全部</param>
/// <param name="sort">要排序的字段</param>
/// <returns></returns>
public async Task<List<T>> FindListByPageAsync(FilterDefinition<T> filter, int pageIndex, int pageSize, string[]? field = null, SortDefinition<T>? sort = null)
{
//不指定查询字段
if (field == null || field.Length == 0)
{
if (sort == null) return await _dbSet.Find(filter).Skip((pageIndex - 1) * pageSize).Limit(pageSize).ToListAsync();
//进行排序
return await _dbSet.Find(filter).Sort(sort).Skip((pageIndex - 1) * pageSize).Limit(pageSize).ToListAsync();
}
//指定查询字段
var fieldList = new List<ProjectionDefinition<T>>();
for (int i = 0; i < field.Length; i++)
{
fieldList.Add(Builders<T>.Projection.Include(field[i].ToString()));
}
var projection = Builders<T>.Projection.Combine(fieldList);
fieldList?.Clear();
//不排序
if (sort == null) return await _dbSet.Find(filter).Project<T>(projection).Skip((pageIndex - 1) * pageSize).Limit(pageSize).ToListAsync();
//排序查询
return await _dbSet.Find(filter).Sort(sort).Project<T>(projection).Skip((pageIndex - 1) * pageSize).Limit(pageSize).ToListAsync();
}
#endregion
}
工作单元模式
工作单元模式是“维护一个被业务事务影响的对象列表,协调变化的写入和并发问题的解决”。具体来说,在C#工作单元模式中,我们通过UnitOfWork对象来管理多个Repository对象,同时UnitOfWork还提供了对事务的支持。对于一组需要用到多个Repository的业务操作,我们可以在UnitOfWork中创建一个事务,并将多个Repository操作放在同一个事务中处理,以保证数据的一致性。当所有Repository操作完成后,再通过UnitOfWork提交事务或者回滚事务。
定义IUnitOfWork接口
/// <summary>
/// 工作单元接口
/// </summary>
public interface IUnitOfWork : IDisposable
{
/// <summary>
/// 提交保存更改
/// </summary>
/// <returns></returns>
Task<bool> Commit();
}
定义UnitOfWork类
/// <summary>
/// 工作单元类
/// </summary>
public class UnitOfWork : IUnitOfWork
{
private readonly IMongoContext _context;
public UnitOfWork(IMongoContext context)
{
_context = context;
}
/// <summary>
/// 提交保存更改
/// </summary>
/// <returns></returns>
public async Task<bool> Commit()
{
return await _context.SaveChangesAsync() > 0;
}
public void Dispose()
{
_context.Dispose();
}
}
注册数据库基础操作和工作单元
//注册数据库基础操作和工作单元
builder.Services.AddScoped<IMongoContext, MongoContext>();
builder.Services.AddScoped<IUnitOfWork, UnitOfWork>();
参考文章
NoSQL – MongoDB Repository Implementation in .NET Core with Unit Testing example
ASP.NET CORE – MONGODB REPOSITORY PATTERN & UNIT OF WORK