使用工作流引擎

1. 创建StateMachine实例

var (
    sharedStorageClusterSmOnce       sync.Once
    sharedStorageClusterStateMachine *statemachine.StateMachine
)
func GetSharedStorageClusterStateMachine() *statemachine.StateMachine {
    sharedStorageClusterSmOnce.Do(func() {
        if sharedStorageClusterStateMachine == nil {
            sharedStorageClusterStateMachine = statemachine.CreateStateMachineInstance(ResourceType)
            sharedStorageClusterStateMachine.RegisterStableState(statemachine.StateRunning, statemachine.StateInterrupt, statemachine.StateInit)
        }
    })
    return sharedStorageClusterStateMachine
}

2. 创建WorkflowManager实例

var ResourceType    = "shared"
var WorkFlowMetaDir = ""./pkg/workflow""

var (
    sharedStorageClusterWfOnce    sync.Once
    sharedStorageClusterWfManager *wfengine.WfManager
)

func GetSharedStorageClusterWfManager() *wfengine.WfManager {
    sharedStorageClusterWfOnce.Do(func() {
        if sharedStorageClusterWfManager == nil {
            var err error
            sharedStorageClusterWfManager, err = createWfManager(ResourceType, WorkFlowMetaDir)
            sharedStorageClusterWfManager.RegisterRecover(wfengineimpl.CreateDefaultRecover())
            if err != nil {
                panic(fmt.Sprintf("create %s wf manager failed: %v", ResourceType, err))
            }
        }
    })
    return sharedStorageClusterWfManager
}

func createWfManager(resourceType, workFlowMetaDir string) (wfManager *wfengine.WfManager, err error) {
    wfManager, err = wfengine.CreateWfManager(
        resourceType,
        workFlowMetaDir,
        wfengineimpl.CreateDefaultWfMetaLoader,
        wfengineimpl.CreateDefaultWorkflowHook,
        wfengineimpl.GetDefaultMementoStorageFactory(resourceType, false),
    )
    return
}

3. 定义Resource

资源要实现以下接口才能配合状态机使用,所以需要对K8S资源做一下封装。

// 支持状态机的资源
type StateResource interface {
   GetName() string
   GetNamespace() string
   Fetch() (StateResource, error)
   GetState() State
   UpdateState(State) (StateResource, error)
   IsCancelled() bool
}
type MpdClusterResource struct {
    implement.KubeResource
    Logger logr.Logger
}

func (s *MpdClusterResource) GetState() statemachine.State {
    return s.GetMpdCluster().Status.ClusterStatus
}

// UpdateState 更新资源当前状态(string)
func (s *MpdClusterResource) UpdateState(state statemachine.State) (statemachine.StateResource, error) {
    so, err := s.fetch()
    mpdCluster := so.Resource.(*v1.MPDCluster)
    mpdCluster.Status.ClusterStatus = state
    if mgr.GetSyncClient().Status().Update(context.TODO(), mpdCluster); err != nil {
        s.Logger.Error(err, "update mpd cluster status error")
        return nil, err
    }
    return so, nil
}

// 更新资源信息
func (s *MpdClusterResource) Update() error {
    if err := mgr.GetSyncClient().Update(context.TODO(), s.GetMpdCluster()); err != nil {
        s.Logger.Error(err, "update mpd cluster error")
        return err
    }
    return nil
}

// Fetch 重新获取资源
func (s *MpdClusterResource) Fetch() (statemachine.StateResource, error) {
    return s.fetch()
}

// GetScheme ...
func (s *MpdClusterResource) GetScheme() *runtime.Scheme {
    return mgr.GetManager().GetScheme()
}

func (s *MpdClusterResource) IsCancelled() bool {
    mpd, err := s.fetch()
    if err != nil {
        if apierrors.IsNotFound(err) {
            return true
        }
        return false
    }
    return mpd.Resource.GetAnnotations()["cancelled"] == "true" || mpd.Resource.GetDeletionTimestamp() != nil
}

func (s *MpdClusterResource) fetch() (*MpdClusterResource, error) {
    kubeRes := &v1.MPDCluster{}
    err := mgr.GetSyncClient().Get(
        context.TODO(), types.NamespacedName{Name: s.Resource.GetName(), Namespace: s.Resource.GetNamespace()}, kubeRes)
    if err != nil {
        s.Logger.Error(err, "mpd cluster not found")
        return nil, err
    }
    return &MpdClusterResource{
        KubeResource: implement.KubeResource{
            Resource: kubeRes,
        },
        Logger: s.Logger,
    }, nil
}

4. 定义StepBase

所有step都要实现以下接口:

type StepAction interface {
   Init(map[string]interface{}, logr.Logger) error
   DoStep(context.Context, logr.Logger) error
   Output(logr.Logger) map[string]interface{}
}

可以实现一个StepBase基类,所有Step继承自StepBase:

type SharedStorageClusterStepBase struct {
    wfengine.StepAction
    Resource *v1.MPDCluster
    Service  *service.SharedStorageClusterService
    Model    *domain.SharedStorageCluster
}

func (s *SharedStorageClusterStepBase) Init(ctx map[string]interface{}, logger logr.Logger) error {
    name := ctx[define.DefaultWfConf[wfdefine.WorkFlowResourceName]].(string)
    ns := ctx[define.DefaultWfConf[wfdefine.WorkFlowResourceNameSpace]].(string)

    kube := &v1.MPDCluster{}
    err := mgr.GetSyncClient().Get(context.TODO(), types.NamespacedName{Name: name, Namespace: ns}, kube)
    if err != nil {
        return err
    }
    s.Resource = kube
    s.Service = business.NewSharedStorageClusterService(logger)
    useModifyClass := false
    if val, ok := ctx["modifyClass"]; ok {
        useModifyClass = val.(bool)
    }
    useUpgradeVersion := false
    if val, ok := ctx["upgrade"]; ok {
        useUpgradeVersion = val.(bool)
    }
    s.Model = s.Service.GetByData(kube, useModifyClass, useUpgradeVersion)
    return nil
}

func (s *SharedStorageClusterStepBase) DoStep(ctx context.Context, logger logr.Logger) error {
    panic("implement me")
}

func (s *SharedStorageClusterStepBase) Output(logger logr.Logger) map[string]interface{} {
    return map[string]interface{}{}
}

5. 定义Step

type InitMeta struct {
    wf.SharedStorageClusterStepBase
}

func (step *InitMeta) DoStep(ctx context.Context, logger logr.Logger) error {
    return step.Service.InitMeta(step.Model)
}

6. 注册Step

    wfManager := GetSharedStorageClusterWfManager()
    wfManager.RegisterStep(&InitMeta{})

7. 注册状态机

定义稳定态到非稳定态的转换检测函数

func checkInstall(obj statemachine.StateResource) (*statemachine.Event, error) {
    cluster := obj.(*wf.MpdClusterResource).GetMpdCluster()
    if cluster.Status.ClusterStatus == "Init" || cluster.Status.ClusterStatus == "" || string(cluster.Status.ClusterStatus) == string(statemachine.StateCreating) {
        return statemachine.CreateEvent(statemachine.EventName(statemachine.StateCreating), nil), nil
    }
    return nil, nil
}

定义非稳定态入口函数

func installMainEnter(obj statemachine.StateResource) error {
    resourceWf, err := wf.GetSharedStorageClusterWfManager().CreateResourceWorkflow(obj)
    if err != nil {
        return err
    }
    return resourceWf.CommonWorkFlowMainEnter(context.TODO(), obj, "CreateSharedStorageCluster", false, checkInstall)
}

注册稳定态、非稳定态、稳定态到非稳定态的转换检测函数、非稳定态入口函数

    smIns := GetSharedStorageClusterStateMachine()

    // 注册稳定态到非稳定态的转换检测及非稳定态的入口
    smIns.RegisterStateTranslateMainEnter(statemachine.StateInit, checkInstall, statemachine.StateCreating, installMainEnter)

8. 配置流程元数据

flowName: CreateSharedStorageCluster
recoverFromFirstStep: false
steps:
  - className: workflow_shared.InitMeta
    stepName:  InitMeta

  - className: workflow_shared.PrepareStorage
    stepName:  PrepareStorage

  - className: workflow_shared.CreateRwPod
    stepName:  CreateRwPod

  - className: workflow_shared.CreateRoPods
    stepName:  CreateRoPods

  - className: workflow_shared.CreateClusterManager
    stepName:  CreateClusterManager

  - className: workflow_shared.AddToClusterManager
    stepName:  AddToClusterManager

  - className: workflow_shared.UpdateRunningStatus
    stepName:  UpdateRunningStatus