使用工作流引擎
1. 创建StateMachine实例
var (
sharedStorageClusterSmOnce sync.Once
sharedStorageClusterStateMachine *statemachine.StateMachine
)
func GetSharedStorageClusterStateMachine() *statemachine.StateMachine {
sharedStorageClusterSmOnce.Do(func() {
if sharedStorageClusterStateMachine == nil {
sharedStorageClusterStateMachine = statemachine.CreateStateMachineInstance(ResourceType)
sharedStorageClusterStateMachine.RegisterStableState(statemachine.StateRunning, statemachine.StateInterrupt, statemachine.StateInit)
}
})
return sharedStorageClusterStateMachine
}
2. 创建WorkflowManager实例
var ResourceType = "shared"
var WorkFlowMetaDir = ""./pkg/workflow""
var (
sharedStorageClusterWfOnce sync.Once
sharedStorageClusterWfManager *wfengine.WfManager
)
func GetSharedStorageClusterWfManager() *wfengine.WfManager {
sharedStorageClusterWfOnce.Do(func() {
if sharedStorageClusterWfManager == nil {
var err error
sharedStorageClusterWfManager, err = createWfManager(ResourceType, WorkFlowMetaDir)
sharedStorageClusterWfManager.RegisterRecover(wfengineimpl.CreateDefaultRecover())
if err != nil {
panic(fmt.Sprintf("create %s wf manager failed: %v", ResourceType, err))
}
}
})
return sharedStorageClusterWfManager
}
func createWfManager(resourceType, workFlowMetaDir string) (wfManager *wfengine.WfManager, err error) {
wfManager, err = wfengine.CreateWfManager(
resourceType,
workFlowMetaDir,
wfengineimpl.CreateDefaultWfMetaLoader,
wfengineimpl.CreateDefaultWorkflowHook,
wfengineimpl.GetDefaultMementoStorageFactory(resourceType, false),
)
return
}
3. 定义Resource
资源要实现以下接口才能配合状态机使用,所以需要对K8S资源做一下封装。
// 支持状态机的资源
type StateResource interface {
GetName() string
GetNamespace() string
Fetch() (StateResource, error)
GetState() State
UpdateState(State) (StateResource, error)
IsCancelled() bool
}
type MpdClusterResource struct {
implement.KubeResource
Logger logr.Logger
}
func (s *MpdClusterResource) GetState() statemachine.State {
return s.GetMpdCluster().Status.ClusterStatus
}
// UpdateState 更新资源当前状态(string)
func (s *MpdClusterResource) UpdateState(state statemachine.State) (statemachine.StateResource, error) {
so, err := s.fetch()
mpdCluster := so.Resource.(*v1.MPDCluster)
mpdCluster.Status.ClusterStatus = state
if mgr.GetSyncClient().Status().Update(context.TODO(), mpdCluster); err != nil {
s.Logger.Error(err, "update mpd cluster status error")
return nil, err
}
return so, nil
}
// 更新资源信息
func (s *MpdClusterResource) Update() error {
if err := mgr.GetSyncClient().Update(context.TODO(), s.GetMpdCluster()); err != nil {
s.Logger.Error(err, "update mpd cluster error")
return err
}
return nil
}
// Fetch 重新获取资源
func (s *MpdClusterResource) Fetch() (statemachine.StateResource, error) {
return s.fetch()
}
// GetScheme ...
func (s *MpdClusterResource) GetScheme() *runtime.Scheme {
return mgr.GetManager().GetScheme()
}
func (s *MpdClusterResource) IsCancelled() bool {
mpd, err := s.fetch()
if err != nil {
if apierrors.IsNotFound(err) {
return true
}
return false
}
return mpd.Resource.GetAnnotations()["cancelled"] == "true" || mpd.Resource.GetDeletionTimestamp() != nil
}
func (s *MpdClusterResource) fetch() (*MpdClusterResource, error) {
kubeRes := &v1.MPDCluster{}
err := mgr.GetSyncClient().Get(
context.TODO(), types.NamespacedName{Name: s.Resource.GetName(), Namespace: s.Resource.GetNamespace()}, kubeRes)
if err != nil {
s.Logger.Error(err, "mpd cluster not found")
return nil, err
}
return &MpdClusterResource{
KubeResource: implement.KubeResource{
Resource: kubeRes,
},
Logger: s.Logger,
}, nil
}
4. 定义StepBase
所有step都要实现以下接口:
type StepAction interface {
Init(map[string]interface{}, logr.Logger) error
DoStep(context.Context, logr.Logger) error
Output(logr.Logger) map[string]interface{}
}
可以实现一个StepBase基类,所有Step继承自StepBase:
type SharedStorageClusterStepBase struct {
wfengine.StepAction
Resource *v1.MPDCluster
Service *service.SharedStorageClusterService
Model *domain.SharedStorageCluster
}
func (s *SharedStorageClusterStepBase) Init(ctx map[string]interface{}, logger logr.Logger) error {
name := ctx[define.DefaultWfConf[wfdefine.WorkFlowResourceName]].(string)
ns := ctx[define.DefaultWfConf[wfdefine.WorkFlowResourceNameSpace]].(string)
kube := &v1.MPDCluster{}
err := mgr.GetSyncClient().Get(context.TODO(), types.NamespacedName{Name: name, Namespace: ns}, kube)
if err != nil {
return err
}
s.Resource = kube
s.Service = business.NewSharedStorageClusterService(logger)
useModifyClass := false
if val, ok := ctx["modifyClass"]; ok {
useModifyClass = val.(bool)
}
useUpgradeVersion := false
if val, ok := ctx["upgrade"]; ok {
useUpgradeVersion = val.(bool)
}
s.Model = s.Service.GetByData(kube, useModifyClass, useUpgradeVersion)
return nil
}
func (s *SharedStorageClusterStepBase) DoStep(ctx context.Context, logger logr.Logger) error {
panic("implement me")
}
func (s *SharedStorageClusterStepBase) Output(logger logr.Logger) map[string]interface{} {
return map[string]interface{}{}
}
5. 定义Step
type InitMeta struct {
wf.SharedStorageClusterStepBase
}
func (step *InitMeta) DoStep(ctx context.Context, logger logr.Logger) error {
return step.Service.InitMeta(step.Model)
}
6. 注册Step
wfManager := GetSharedStorageClusterWfManager()
wfManager.RegisterStep(&InitMeta{})
7. 注册状态机
定义稳定态到非稳定态的转换检测函数
func checkInstall(obj statemachine.StateResource) (*statemachine.Event, error) {
cluster := obj.(*wf.MpdClusterResource).GetMpdCluster()
if cluster.Status.ClusterStatus == "Init" || cluster.Status.ClusterStatus == "" || string(cluster.Status.ClusterStatus) == string(statemachine.StateCreating) {
return statemachine.CreateEvent(statemachine.EventName(statemachine.StateCreating), nil), nil
}
return nil, nil
}
定义非稳定态入口函数
func installMainEnter(obj statemachine.StateResource) error {
resourceWf, err := wf.GetSharedStorageClusterWfManager().CreateResourceWorkflow(obj)
if err != nil {
return err
}
return resourceWf.CommonWorkFlowMainEnter(context.TODO(), obj, "CreateSharedStorageCluster", false, checkInstall)
}
注册稳定态、非稳定态、稳定态到非稳定态的转换检测函数、非稳定态入口函数
smIns := GetSharedStorageClusterStateMachine()
// 注册稳定态到非稳定态的转换检测及非稳定态的入口
smIns.RegisterStateTranslateMainEnter(statemachine.StateInit, checkInstall, statemachine.StateCreating, installMainEnter)
8. 配置流程元数据
flowName: CreateSharedStorageCluster
recoverFromFirstStep: false
steps:
- className: workflow_shared.InitMeta
stepName: InitMeta
- className: workflow_shared.PrepareStorage
stepName: PrepareStorage
- className: workflow_shared.CreateRwPod
stepName: CreateRwPod
- className: workflow_shared.CreateRoPods
stepName: CreateRoPods
- className: workflow_shared.CreateClusterManager
stepName: CreateClusterManager
- className: workflow_shared.AddToClusterManager
stepName: AddToClusterManager
- className: workflow_shared.UpdateRunningStatus
stepName: UpdateRunningStatus