ABP+WorkflowCore+jsplumb实现工作流
(给DotNet加星标,提升.Net技能)
转自:Metto cnblogs.com/lispring/p/13738600.html
前言
ABP目前已经是很成熟的开发框架了,它提供了很多我们日常开发所必须的功能,并且很方便扩展,让我们能更专注于业务的开发。但是ABP官方并没有给我们实现工作流。
在.NET Core环境下的开源工作流引擎很少,其中WorkflowCore是一款轻量级工作流引擎,对于小型工作流和责任链类型的需求开发很适合,但只能通过后台编码或者json的方式定义工作流程,看了源码后觉得扩展性还是挺好的,至少能满足我的需求,于是选择对它下手。
jsPlumb是一个开源的比较强大的绘图组件,这里不多介绍,我就是用它实现一个简单的流程设计器。
花了差不多一个月的时间,把这三者结合到一起实现一个简单而强大的工作流模块。
一、ABP模块实现WorkflowCore持久化存储接口(IPersistenceProvider)
这里我参考了WorkflowCore.Persistence.EntityFramework 持久化项目的实现方式 用ABP的方式实现了WorkflowCore的持久化。这样做有两个好处:
1、让工作流能支持ABP的多租户和全局数据过滤功能
2、数据库操作能使用统一的数据上下文,方便事务提交和回滚。
ABP实现的流程Workflow持久化存储所必须的实体类
其中PersistedWorkflowDefinition是用来持久化存储流程定义(在Workflow中流程定义在内存中)如下图:
实现IPersistenceProvider接口
public interface IAbpPersistenceProvider : IPersistenceProvider
{
Task<PersistedWorkflow> GetPersistedWorkflow(Guid id);
Task<PersistedExecutionPointer> GetPersistedExecutionPointer(string id);
Task<PersistedWorkflowDefinition> GetPersistedWorkflowDefinition(string id, int version);
}
public class AbpPersistenceProvider : DomainService, IAbpPersistenceProvider
{
protected readonly IRepository<PersistedEvent, Guid> _eventRepository;
protected readonly IRepository<PersistedExecutionPointer, string> _executionPointerRepository;
protected readonly IRepository<PersistedWorkflow, Guid> _workflowRepository;
protected readonly IRepository<PersistedWorkflowDefinition, string > _workflowDefinitionRepository;
protected readonly IRepository<PersistedSubscription, Guid> _eventSubscriptionRepository;
protected readonly IRepository<PersistedExecutionError, Guid> _executionErrorRepository;
protected readonly IGuidGenerator _guidGenerator;
protected readonly IAsyncQueryableExecuter _asyncQueryableExecuter;
public IAbpSession AbpSession { get; set; }
public AbpPersistenceProvider(IRepository<PersistedEvent, Guid> eventRepository, IRepository<PersistedExecutionPointer, string> executionPointerRepository, IRepository<PersistedWorkflow, Guid> workflowRepository, IRepository<PersistedSubscription, Guid> eventSubscriptionRepository, IGuidGenerator guidGenerator, IAsyncQueryableExecuter asyncQueryableExecuter, IRepository<PersistedExecutionError, Guid> executionErrorRepository, IRepository<PersistedWorkflowDefinition, string > workflowDefinitionRepository)
{
_eventRepository = eventRepository;
_executionPointerRepository = executionPointerRepository;
_workflowRepository = workflowRepository;
_eventSubscriptionRepository = eventSubscriptionRepository;
_guidGenerator = guidGenerator;
_asyncQueryableExecuter = asyncQueryableExecuter;
_executionErrorRepository = executionErrorRepository;
_workflowDefinitionRepository = workflowDefinitionRepository;
}
[UnitOfWork]
public virtual async Task<string> CreateEventSubscription(EventSubscription subscription)
{
subscription.Id = _guidGenerator.Create().ToString();
var persistable = subscription.ToPersistable();
await _eventSubscriptionRepository.InsertAsync(persistable);
return subscription.Id;
}
[UnitOfWork]
public virtual async Task<string> CreateNewWorkflow(WorkflowInstance workflow)
{
workflow.Id = _guidGenerator.Create().ToString();
var persistable = workflow.ToPersistable();
if (AbpSession.UserId.HasValue)
{
var userCache = AbpSession.GetCurrentUser();
persistable.CreateUserIdentityName = userCache.FullName;
}
await _workflowRepository.InsertAsync(persistable);
return workflow.Id;
}
[UnitOfWork]
public virtual async Task<IEnumerable<string>> GetRunnableInstances(DateTime asAt)
{
var now = asAt.ToUniversalTime().Ticks;
var query = _workflowRepository.GetAll().Where(x => x.NextExecution.HasValue && (x.NextExecution <= now) && (x.Status == WorkflowStatus.Runnable))
.Select(x => x.Id);
var raw = await _asyncQueryableExecuter.ToListAsync(query);
return raw.Select(s => s.ToString()).ToList();
}
[UnitOfWork]
public virtual async Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(WorkflowStatus? status, string type, DateTime? createdFrom, DateTime? createdTo, int skip, int take)
{ IQueryable<PersistedWorkflow> query = _workflowRepository.GetAll()
.Include(wf => wf.ExecutionPointers)
.ThenInclude(ep => ep.ExtensionAttributes)
.Include(wf => wf.ExecutionPointers)
.AsQueryable();
if (status.HasValue)
query = query.Where(x => x.Status == status.Value);
if (!String.IsNullOrEmpty(type))
query = query.Where(x => x.WorkflowDefinitionId == type);
if (createdFrom.HasValue)
query = query.Where(x => x.CreateTime >= createdFrom.Value);
if (createdTo.HasValue)
query = query.Where(x => x.CreateTime <= createdTo.Value);
var rawResult = await query.Skip(skip).Take(take).ToListAsync();
List<WorkflowInstance> result = new List<WorkflowInstance>();
foreach (var item in rawResult)
result.Add(item.ToWorkflowInstance());
return result;
}
[UnitOfWork]
public virtual async Task<WorkflowInstance> GetWorkflowInstance(string Id)
{
var uid = new Guid(Id);
var raw = await _workflowRepository.GetAll()
.Include(wf => wf.ExecutionPointers)
.ThenInclude(ep => ep.ExtensionAttributes)
.Include(wf => wf.ExecutionPointers)
.FirstAsync(x => x.Id == uid);
if (raw == null)
return null;
return raw.ToWorkflowInstance();
}
[UnitOfWork]
public virtual async Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(IEnumerable<string> ids)
{
if (ids == null)
{
return new List<WorkflowInstance>();
}
var uids = ids.Select(i => new Guid(i));
var raw = _workflowRepository.GetAll()
.Include(wf => wf.ExecutionPointers)
.ThenInclude(ep => ep.ExtensionAttributes)
.Include(wf => wf.ExecutionPointers)
.Where(x => uids.Contains(x.Id));
return (await raw.ToListAsync()).Select(i => i.ToWorkflowInstance());
}
[UnitOfWork]
public virtual async Task PersistWorkflow(WorkflowInstance workflow)
{
var uid = new Guid(workflow.Id);
var existingEntity = await _workflowRepository.GetAll()
.Where(x => x.Id == uid)
.Include(wf => wf.ExecutionPointers)
.ThenInclude(ep => ep.ExtensionAttributes)
.Include(wf => wf.ExecutionPointers)
.AsTracking()
.FirstAsync();
var persistable = workflow.ToPersistable(existingEntity);
await CurrentUnitOfWork.SaveChangesAsync();
}
[UnitOfWork]
public virtual async Task TerminateSubscription(string eventSubscriptionId)
{
var uid = new Guid(eventSubscriptionId);
var existing = await _eventSubscriptionRepository.FirstOrDefaultAsync(x => x.Id == uid);
_eventSubscriptionRepository.Delete(existing);
await CurrentUnitOfWork.SaveChangesAsync();
}
[UnitOfWork]
public virtual void EnsureStoreExists()
{
}
[UnitOfWork]
public virtual async Task<IEnumerable<EventSubscription>> GetSubscriptions(string eventName, string eventKey, DateTime asOf)
{
asOf = asOf.ToUniversalTime();
var raw = await _eventSubscriptionRepository.GetAll()
.Where(x => x.EventName == eventName && x.EventKey == eventKey && x.SubscribeAsOf <= asOf)
.ToListAsync();
return raw.Select(item => item.ToEventSubscription()).ToList();
}
[UnitOfWork]
public virtual async Task<string> CreateEvent(Event newEvent)
{
newEvent.Id = _guidGenerator.Create().ToString();
var persistable = newEvent.ToPersistable();
var result = _eventRepository.InsertAsync(persistable);
await CurrentUnitOfWork.SaveChangesAsync();
return newEvent.Id;
}
[UnitOfWork]
public virtual async Task<Event> GetEvent(string id)
{
Guid uid = new Guid(id);
var raw = await _eventRepository
.FirstOrDefaultAsync(x => x.Id == uid);
if (raw == null)
return null;
return raw.ToEvent();
}
[UnitOfWork]
public virtual async Task<IEnumerable<string>> GetRunnableEvents(DateTime asAt)
{
var now = asAt.ToUniversalTime();
asAt = asAt.ToUniversalTime();
var raw = await _eventRepository.GetAll()
.Where(x => !x.IsProcessed)
.Where(x => x.EventTime <= now)
.Select(x => x.Id)
.ToListAsync();
return raw.Select(s => s.ToString()).ToList();
}
[UnitOfWork]
public virtual async Task MarkEventProcessed(string id)
{
var uid = new Guid(id);
var existingEntity = await _eventRepository.GetAll()
.Where(x => x.Id == uid)
.AsTracking()
.FirstAsync();
existingEntity.IsProcessed = true;
await CurrentUnitOfWork.SaveChangesAsync();
}
[UnitOfWork]
public virtual async Task<IEnumerable<string>> GetEvents(string eventName, string eventKey, DateTime asOf)
{
var raw = await _eventRepository.GetAll()
.Where(x => x.EventName == eventName && x.EventKey == eventKey)
.Where(x => x.EventTime >= asOf)
.Select(x => x.Id)
.ToListAsync();
var result = new List<string>();
foreach (var s in raw)
result.Add(s.ToString());
return result;
}
[UnitOfWork]
public virtual async Task MarkEventUnprocessed(string id)
{
var uid = new Guid(id);
var existingEntity = await _eventRepository.GetAll()
.Where(x => x.Id == uid)
.AsTracking()
.FirstAsync();
existingEntity.IsProcessed = false;
await CurrentUnitOfWork.SaveChangesAsync();
}
[UnitOfWork
public virtual async Task PersistErrors(IEnumerable<ExecutionError> errors)
{
var executionErrors = errors as ExecutionError[] ?? errors.ToArray();
if (executionErrors.Any())
{
foreach (var error in executionErrors)
{
await _executionErrorRepository.InsertAsync(error.ToPersistable());
}
await CurrentUnitOfWork.SaveChangesAsync();
}
}
[UnitOfWork]
public virtual async Task<EventSubscription> GetSubscription(string eventSubscriptionId)
{
var uid = new Guid(eventSubscriptionId);
var raw = await _eventSubscriptionRepository.FirstOrDefaultAsync(x => x.Id == uid);
return raw?.ToEventSubscription();
}
[UnitOfWork]
public virtual async Task<EventSubscription> GetFirstOpenSubscription(string eventName, string eventKey, DateTime asOf)
{
var raw = await _eventSubscriptionRepository.FirstOrDefaultAsync(x => x.EventName == eventName && x.EventKey == eventKey && x.SubscribeAsOf <= asOf && x.ExternalToken == null);
return raw?.ToEventSubscription();
}
[UnitOfWork]
public virtual async Task<bool> SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry)
{
var uid = new Guid(eventSubscriptionId);
var existingEntity = await _eventSubscriptionRepository.GetAll()
.Where(x => x.Id == uid)
.AsTracking()
.FirstAsync();
existingEntity.ExternalToken = token;
existingEntity.ExternalWorkerId = workerId;
existingEntity.ExternalTokenExpiry = expiry;
await CurrentUnitOfWork.SaveChangesAsync();
return true;
}
[UnitOfWork]
public virtual async Task ClearSubscriptionToken(string eventSubscriptionId, string token)
{
var uid = new Guid(eventSubscriptionId);
var existingEntity = await _eventSubscriptionRepository.GetAll()
.Where(x => x.Id == uid)
.AsTracking()
.FirstAsync();
if (existingEntity.ExternalToken != token)
throw new InvalidOperationException();
existingEntity.ExternalToken = null;
existingEntity.ExternalWorkerId = null;
existingEntity.ExternalTokenExpiry = null;
await CurrentUnitOfWork.SaveChangesAsync();
}
public Task<PersistedWorkflow> GetPersistedWorkflow(Guid id)
{
return _workflowRepository.GetAsync(id);
}
public Task<PersistedWorkflowDefinition> GetPersistedWorkflowDefinition(string id, int version)
{
return _workflowDefinitionRepository.GetAll().AsNoTracking().FirstOrDefaultAsync(u => u.Id == id && u.Version == version);
}
public Task<PersistedExecutionPointer> GetPersistedExecutionPointer(string id)
{
return _executionPointerRepository.GetAsync(id);
}
}
服务注册添加AddWorkflow时把IPersistenceProvider提供的默认实现换成AbpPersistenceProvider
public static class ServiceCollectionExtensions
{
public static IServiceCollection AddAbpWorkflow(this IServiceCollection services, Action<WorkflowOptions> setupAction = null)
{
services.AddSingleton<IPersistenceProvider, AbpPersistenceProvider>();
services.AddWorkflow(options =>
{
options.UsePersistence(sp => sp.GetService<AbpPersistenceProvider>());
setupAction?.Invoke(options);
});
services.AddWorkflowDSL();
return services;
}
}
到此为止,ABP已经实现了WorkflowCore的默认的持久化存储。
二、ABP中AbpWorkflow和AbpStepBody的自定义注册
为了满足开发人员和用户的需求,我提供了两种流程注册方式,一种是开发人员后台编码定义固定流程另一种是用户通过流程设计器实现自定义业务流程。
开发人员后台编码定义固定流程
这里参考ABP的EventBus注册方式,实现IWindsorInstaller ,在组件注册时拦截并注册:
//ABP工作流接口
public interface IAbpWorkflow : IWorkflow<WorkflowParamDictionary>
{
}
//工作流注册接口
public interface IAbpWorkflowRegisty
{
void RegisterWorkflow(Type type);
}
//Abp工作流注册实现
public class AbpWorkflowRegisty : IAbpWorkflowRegisty, ISingletonDependency
{
private IWorkflowRegistry _workflowRegistry;
private readonly IIocManager _iocManager;
public AbpWorkflowRegisty(IWorkflowRegistry workflowRegistry, IIocManager iocManager)
{
this._workflowRegistry = workflowRegistry;
this._iocManager = iocManager;
}
public void RegisterWorkflow(Type type)
{
var workflow = _iocManager.Resolve(type);
if (!(workflow is IAbpWorkflow))
{
throw new AbpException("RegistType must implement from AbpWorkflow!");
}
_workflowRegistry.RegisterWorkflow(workflow as IWorkflow<WorkflowParamDictionary>);
}
}
//拦截器实现
internal class WorkflowInstaller : IWindsorInstaller
{
private readonly IIocResolver _iocResolver;
private IAbpWorkflowRegisty serviceSelector;
public WorkflowInstaller(IIocResolver iocResolver)
{
_iocResolver = iocResolver;
}
public void Install(IWindsorContainer container, IConfigurationStore store)
{
serviceSelector = container.Resolve<IAbpWorkflowRegisty>();
container.Kernel.ComponentRegistered += Kernel_ComponentRegistered;
}
private void Kernel_ComponentRegistered(string key, IHandler handler)
{
if (!typeof(IAbpWorkflow).GetTypeInfo().IsAssignableFrom(handler.ComponentModel.Implementation))
{
return;
}
var interfaces = handler.ComponentModel.Implementation.GetTypeInfo().GetInterfaces();
foreach (var @interface in interfaces)
{
if (!typeof(IAbpWorkflow).GetTypeInfo().IsAssignableFrom(@interface))
{
continue;
}
serviceSelector.RegisterWorkflow( handler.ComponentModel.Implementation);
}
}
}
到这里,把拦截器注册到模块类的Initialize中,开发人员定义流程只需要实现IAbpWorkflow接口,系统启动时会自动注册。如图:
自定义注册StepBody
这里参考ABP中标准的配置模式(不清楚的可以去看下ABP的源码,ABP的配置系统和权限系统都是这样配置的),将注册的StepBody存储在内存中提供给用户自定义组合流程节点使用,下列代码展示了注册指定用户审核的StepBody,执行方法体的实现:
public class DefaultStepBodyProvider : AbpStepBodyProvider
{
public override void Build(IAbpStepBodyDefinitionContext context)
{
var step1 = new AbpWorkflowStepBody();
step1.Name = "FixedUserAudit";
step1.DisplayName = "指定用户审核";
step1.StepBodyType = typeof(GeneralAuditingStepBody);
step1.Inputs.Add(new WorkflowParam()
{
InputType = new SelectUserInputType(),//定义前端输入类型,继承Abp.UI.Inputs.InputTypeBase
Name = "UserId",
DisplayName = "审核人"
});
context.Create(step1);
}
}
/// <summary>
///指定用户审批StepBody
///</summary>
public class GeneralAuditingStepBody : StepBody, ITransientDependency
{
private const string ActionName = "AuditEvent";
protected readonly INotificationPublisher _notificationPublisher;
protected readonly IAbpPersistenceProvider _abpPersistenceProvider;
protected readonly UserManager _userManager;
public readonly IRepository<PersistedWorkflowAuditor, Guid> _auditorRepository;
public GeneralAuditingStepBody(INotificationPublisher notificationPublisher, UserManager userManager, IAbpPersistenceProvider abpPersistenceProvider,
IRepository<PersistedWorkflowAuditor, Guid> auditorRepository)
{
_notificationPublisher = notificationPublisher;
_abpPersistenceProvider = abpPersistenceProvider;
_userManager = userManager;
_auditorRepository = auditorRepository;
}
/// <summary>
/// 审核人
/// </summary>
public long UserId { get; set; }
[UnitOfWork]
public override ExecutionResult Run(IStepExecutionContext context)
{
if (!context.ExecutionPointer.EventPublished)
{
var workflow = _abpPersistenceProvider.GetPersistedWorkflow(context.Workflow.Id.ToGuid()).Result;
var workflowDefinition = _abpPersistenceProvider.GetPersistedWorkflowDefinition(context.Workflow.WorkflowDefinitionId, context.Workflow.Version).Result;
var userIdentityName = _userManager.Users.Where(u => u.Id == workflow.CreatorUserId).Select(u => u.FullName).FirstOrDefault();
//通知审批人
_notificationPublisher.PublishTaskAsync(new Abp.Notifications.TaskNotificationData($"【{userIdentityName}】提交的{workflowDefinition.Title}需要您审批!"),
userIds: new UserIdentifier[] { new UserIdentifier(workflow.TenantId, UserId) },
entityIdentifier: new EntityIdentifier(workflow.GetType(), workflow.Id)
).Wait();
//添加审核人记录
var auditUserInfo = _userManager.GetUserById(UserId);
_auditorRepository.Insert(new PersistedWorkflowAuditor() { WorkflowId = workflow.Id, ExecutionPointerId = context.ExecutionPointer.Id, Status = Abp.Entitys.CommEnum.EnumAuditStatus.UnAudited, UserId = UserId, TenantId = workflow.TenantId, UserHeadPhoto = auditUserInfo.HeadImage, UserIdentityName = auditUserInfo.FullName });
DateTime effectiveDate = DateTime.MinValue;
return ExecutionResult.WaitForEvent(ActionName, Guid.NewGuid().ToString(), effectiveDate);
}
var pass = _auditorRepository.GetAll().Any(u => u.ExecutionPointerId == context.ExecutionPointer.Id && u.UserId == UserId && u.Status == Abp.Entitys.CommEnum.EnumAuditStatus.Pass);
if (!pass)
{
context.Workflow.Status = WorkflowStatus.Complete;
return ExecutionResult.Next();
}
return ExecutionResult.Next();
}
}
三、设计器实现
流程设计器我用的是Abp提供的Vue项目模板+jsplumb来实现的,话不多说直接上图把:
上图所示,每个节点执行操作选择的是我们后台注册的AbpStepBody。
注:开发人员可根据业务需求尽可能的给用户提供所需的StepBody。
这样一来,整个流程的灵活性是非常好的。
四、设计器提交的流程数据转换成WorkflowCore支持的Json数据结构
前端传给后台的数据结构如下:
后台接收数据后转换成Workflow 支持的Josn字符串,再使用WorkflowCore.DSL提供的帮助类注册流程即可,转换后的Json如下:
{
"DataType": "System.Collections.Generic.Dictionary`2[[System.String, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e],[System.Object, System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]], System.Private.CoreLib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e",
"DefaultErrorBehavior": 0,
"DefaultErrorRetryInterval": null,
"Steps": [{
"StepType": "Abp.Workflows.DefaultSteps.NullStepBody, Abp.Workflows, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null",
"Id": "start_1600248885360yurl0hgrvpd",
"Name": "start_1600248885360yurl0hgrvpd",
"CancelCondition": null,
"ErrorBehavior": null,
"RetryInterval": null,
"Do": [],
"CompensateWith": [],
"Saga": false,
"NextStepId": null,
"Inputs": {},
"Outputs": {},
"SelectNextStep": {
"step_1600248890720r3o927aajy8": "1==1"
}
}, {
"StepType": "Abp.Workflows.StepBodys.GeneralAuditingStepBody, Abp.Workflows, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null",
"Id": "step_1600248890720r3o927aajy8",
"Name": "step_1600248890720r3o927aajy8",
"CancelCondition": null,
"ErrorBehavior": null,
"RetryInterval": null,
"Do": [],
"CompensateWith": [],
"Saga": false,
"NextStepId": null,
"Inputs": {
"UserId": "\"4\""
},
"Outputs": {},
"SelectNextStep": {
"end_16002488928403hmjauowus7": "decimal.Parse(data[\"Days\"].ToString()) <= 1",
"step_160032897781681o9ko9j3nr": "decimal.Parse(data[\"Days\"].ToString()) > 1"
}
}, {
"StepType": "Abp.Workflows.DefaultSteps.SendNotificationToInitiatorStepBody, Abp.Workflows, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null",
"Id": "end_16002488928403hmjauowus7",
"Name": "end_16002488928403hmjauowus7",
"CancelCondition": null,
"ErrorBehavior": null,
"RetryInterval": null,
"Do": [],
"CompensateWith": [],
"Saga": false,
"NextStepId": null,
"Inputs": {
"Message": "\"您的流程已完成\""
},
"Outputs": {},
"SelectNextStep": {}
}, {
"StepType": "Abp.Workflows.StepBodys.GeneralAuditingStepBody, Abp.Workflows, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null",
"Id": "step_160032897781681o9ko9j3nr",
"Name": "step_160032897781681o9ko9j3nr",
"CancelCondition": null,
"ErrorBehavior": null,
"RetryInterval": null,
"Do": [],
"CompensateWith": [],
"Saga": false,
"NextStepId": null,
"Inputs": {
"UserId": "\"5\""
},
"Outputs": {},
"SelectNextStep": {
"end_16002488928403hmjauowus7": "1==1"
}
}],
"Id": "c51e908f-60e3-4a01-ab63-3bce0eaedc48",
"Version": 1,
"Description": "请假"
}
总结
一句话,上面所写的一切都是为了将流程注册到WorkflowCore中而做的铺垫。
后面我会把代码整理一份作为一个ABP的独立模块开源出来供大家参考!
有四年没写博客了,很多东西写着写着觉得没意思,就不写了,这篇写得不好希望各位博友口下留情!
- EOF -
看完本文有收获?请转发分享给更多人
关注「DotNet」加星标,提升.Net技能
好文章,我在看❤️