Featured image of post 如何实现一个CSI?源码解析csi-nfs

如何实现一个CSI?源码解析csi-nfs

如何实现一个CSI?

CSI是一个与存储系统交互的接口标准,它允许存储系统的供应商编写一个独立的插件,以便将其存储系统与Kubernetes集成。

本文将参考开源项目,介绍如何实现一个CSI。

本文所使用环境:

  • k8s版本:v1.27
  • csi-driver-nfs版本:v4.6.0

nfs csi项目地址: https://github.com/kubernetes-csi/csi-driver-nfs.git

pod调度过程查看文章《从源码解析KubeScheduler调度过程》

Attachdetach控制器查看文章《Attachdetach控制器源码解析》

如何创建一个CSI并部署到k8s中?

首先我们向集群中部署csi-nfs, 这是一个nfs的csi实现。

blog|main ⇒ curl -skSL https://raw.githubusercontent.com/kubernetes-csi/csi-driver-nfs/master/deploy/install-driver.sh | bash -s master --
Installing NFS CSI driver, version: master ...
serviceaccount/csi-nfs-controller-sa created
serviceaccount/csi-nfs-node-sa created
clusterrole.rbac.authorization.k8s.io/nfs-external-provisioner-role created
clusterrolebinding.rbac.authorization.k8s.io/nfs-csi-provisioner-binding created
csidriver.storage.k8s.io/nfs.csi.k8s.io created
deployment.apps/csi-nfs-controller created
daemonset.apps/csi-nfs-node created
NFS CSI driver installed successfully.

从部署输出中可以发现, 实现一个csi并部署到k8s需要以下组件:

  • 开发
    • controller(deployment.apps/csi-nfs-controller) 实现csi控制器,部署在server。
    • node plugin(deamonset.apps/csi-nfs-node) 实现存储交互逻辑, 部署在node节点。
  • 部署
    • sa、rbac 用于授权。
    • csidriver 用于向k8s注册csi driver,声明相关信息。
    • deployment、daemonset 用于部署controller和app。

csidriver声明信息如下(sa和rbac所有插件大同小异,这里不展开):

apiVersion: storage.k8s.io/v1
kind: CSIDriver
metadata:
  annotations:
    kubectl.kubernetes.io/last-applied-configuration: |
      {"apiVersion":"storage.k8s.io/v1","kind":"CSIDriver","metadata":{"annotations":{},"name":"nfs.csi.k8s.io"},"spec":{"attachRequired":false,"fsGroupPolicy":"File","volumeLifecycleModes":["Persistent"]}}      
  creationTimestamp: "2024-02-23T02:31:21Z"
  name: nfs.csi.k8s.io
  resourceVersion: "2505462"
  uid: 80952849-8a1b-4d10-b2cd-ea9eba3c2a8e
spec:
  attachRequired: false # 是否支持附加操作。 nfs因为是文件系统,不需要向系统挂载块设备, 所以是false。cephfs等块存储是true。
  fsGroupPolicy: File # 文件系统组策略。定义底层卷是否支持在挂载之前更改卷的所有权和权限。
  podInfoOnMount: false # 是否在挂载时提供pod信息。如果为true,则在挂载时将pod信息(如podName, podUID)传递给CSI驱动程序。
  requiresRepublish: false # 是否需要重新发布。如果为true,NodePublishVolume将被周期性地调用。
  storageCapacity: false # 是否支持存储容量查询。如果为true,存储容量的评估将参与到调度过程中。
  # seLinuxMount: boolen  指定 CSI 驱动是否支持 "-o context" 挂载选项
  # tokenRequests: []TokenRequest 用于指定 CSI 驱动程序所需的令牌请求
  volumeLifecycleModes: # 支持的卷生命周期模式
  - Persistent # 持久化卷
  # - Ephemeral 临时卷, 与pod生命周期关联

还有另外两个与csi相关的资源对象,上述部署回显中没有体现:

  • CSINode 包含节点上安装的所有 CSI 驱动有关的信息, 以及topologyKeys(用于拓扑调度)
  • CSIStorageCapacities 描述指定CSI特定拓扑段中可用的容量, 需要csi支持storageCapacity才能生效。

csi sidecar

为了能使开发人员专注实现csi中“个性”的逻辑, k8s将与api交互的通用逻辑变成了sidecar, 大大简化了CSI驱动程序的开发和部署。

这些sidecar可以带来如下好处:

  • 减少“样板”代码。CSI 驱动程序开发人员不必担心复杂的“Kubernetes 特定”代码。
  • 分离。与 Kubernetes API 交互的代码与实现 CSI 接口的代码隔离(并且位于不同的容器中)。

举例常用sidecar实现:

  • external-provisioner。监听PersistentVolumeClaim,并调用csi CreateVolume创建新的卷。
  • external-attache。监听VolumeAttachment,并调用Controller[Publish|Unpublish]Volume挂载设备到主机。
  • external-resizer。监听PersistentVolumeClaim,并调用ControllerExpandVolume进行卷扩容。
  • external-snapshotter。监听VolumeSnapshotVolumeSnapshotContent,并调用CreateSnapshot, DeleteSnapshot, and ListSnapshots 进行快照操作
  • livenessprobe。存活检测
  • node-driver-registrar。向kubelet注册插件。

我们查看csi-nfs的部署文件中,使用了哪些sidecar(yaml内容省略了无用信息):

# kubectl get deployments.apps -n kube-system csi-nfs-controller -o yaml
    containers:
      - image: registry.k8s.io/sig-storage/csi-provisioner:v4.0.0
        name: csi-provisioner
      - image: registry.k8s.io/sig-storage/csi-snapshotter:v6.3.3
        name: csi-snapshotter
      - image: registry.k8s.io/sig-storage/livenessprobe:v2.12.0
        name: liveness-probe
      - image: gcr.io/k8s-staging-sig-storage/nfsplugin:canary
        name: nfs
---
# kubectl get daemonsets.apps -n kube-system csi-nfs-node -o yaml
    containers:
      - image: registry.k8s.io/sig-storage/livenessprobe:v2.12.0
        name: liveness-probe
      - image: registry.k8s.io/sig-storage/csi-node-driver-registrar:v2.10.0
        name: node-driver-registrar
      - image: gcr.io/k8s-staging-sig-storage/nfsplugin:canary
        name: nfs

csi api定义

csi NodeServer 需要实现以下API:

// for grpc
type NodeServer interface {
    // 对于块设备的挂载与卸载
	NodeStageVolume(context.Context, *NodeStageVolumeRequest) (*NodeStageVolumeResponse, error)
	NodeUnstageVolume(context.Context, *NodeUnstageVolumeRequest) (*NodeUnstageVolumeResponse, error)
    // 文件系统的挂载与卸载
	NodePublishVolume(context.Context, *NodePublishVolumeRequest) (*NodePublishVolumeResponse, error)
	NodeUnpublishVolume(context.Context, *NodeUnpublishVolumeRequest) (*NodeUnpublishVolumeResponse, error)
    // 状态获取
	NodeGetVolumeStats(context.Context, *NodeGetVolumeStatsRequest) (*NodeGetVolumeStatsResponse, error)
    // 扩容
	NodeExpandVolume(context.Context, *NodeExpandVolumeRequest) (*NodeExpandVolumeResponse, error)
    // 容量获取
	NodeGetCapabilities(context.Context, *NodeGetCapabilitiesRequest) (*NodeGetCapabilitiesResponse, error)
    // 基本信息
	NodeGetInfo(context.Context, *NodeGetInfoRequest) (*NodeGetInfoResponse, error)
}
// grpc服务会调用此具体实现
type Interface interface {
	// 挂载指定路径,不可包含敏感项
	Mount(source string, target string, fstype string, options []string) error
	// 与mount一致, 但是可以包含敏感项(如密码)并不会被记录
	MountSensitive(source string, target string, fstype string, options []string, sensitiveOptions []string) error
	// 与MountSensitive一致,但禁用systemd
	MountSensitiveWithoutSystemd(source string, target string, fstype string, options []string, sensitiveOptions []string) error
	// 与MountSensitiveWithoutSystemd一致, 但包含附加选项
	MountSensitiveWithoutSystemdWithMountFlags(source string, target string, fstype string, options []string, sensitiveOptions []string, mountFlags []string) error
	// 卸载
	Unmount(target string) error
	// 返回挂载点列表
	List() ([]MountPoint, error)
	// 判断是否为挂载点, 此方法无法检测linux绑定挂载和符号连接, 但是速度比List更快
	IsLikelyNotMountPoint(file string) (bool, error)
	// 执行挂载检查
	CanSafelySkipMountPointCheck() bool
    // 判断是否为挂载点,此方法可以枚举list中所有对象进行插件,但是比IsLikelyNotMountPoint消耗更大
	IsMountPoint(file string) (bool, error)
	// 返回与目标路径关联的所有挂载点(在windows上不能检测到所有挂载点)
    // 如 /dev/sdc 挂载在/path/a 和 /path/b
	// GetMountRefs("/path/a") would return ["/path/b"]
	// GetMountRefs("/path/b") would return ["/path/a"]
	GetMountRefs(pathname string) ([]string, error)
}

csi ControllerServer需要实现以下API:

type ControllerServer interface {
    // 创建/删除卷
	CreateVolume(context.Context, *CreateVolumeRequest) (*CreateVolumeResponse, error)
	DeleteVolume(context.Context, *DeleteVolumeRequest) (*DeleteVolumeResponse, error)
    // 挂载/卸载卷
	ControllerPublishVolume(context.Context, *ControllerPublishVolumeRequest) (*ControllerPublishVolumeResponse, error)
	ControllerUnpublishVolume(context.Context, *ControllerUnpublishVolumeRequest) (*ControllerUnpublishVolumeResponse, error)
    // 检查卷合法性
	ValidateVolumeCapabilities(context.Context, *ValidateVolumeCapabilitiesRequest) (*ValidateVolumeCapabilitiesResponse, error)
    // 卷列表
	ListVolumes(context.Context, *ListVolumesRequest) (*ListVolumesResponse, error)
    // 获取容量
	GetCapacity(context.Context, *GetCapacityRequest) (*GetCapacityResponse, error)
    // 获取支持的功能
	ControllerGetCapabilities(context.Context, *ControllerGetCapabilitiesRequest) (*ControllerGetCapabilitiesResponse, error)
    // 创建/删除/查询快照
	CreateSnapshot(context.Context, *CreateSnapshotRequest) (*CreateSnapshotResponse, error)
	DeleteSnapshot(context.Context, *DeleteSnapshotRequest) (*DeleteSnapshotResponse, error)
	ListSnapshots(context.Context, *ListSnapshotsRequest) (*ListSnapshotsResponse, error)
    // 扩容
	ControllerExpandVolume(context.Context, *ControllerExpandVolumeRequest) (*ControllerExpandVolumeResponse, error)
	ControllerGetVolume(context.Context, *ControllerGetVolumeRequest) (*ControllerGetVolumeResponse, error)
}

IdentityServer需要实现以下API:

type IdentityServer interface {
    // 插件信息
	GetPluginInfo(context.Context, *GetPluginInfoRequest) (*GetPluginInfoResponse, error)
    // 获取插件功能
	GetPluginCapabilities(context.Context, *GetPluginCapabilitiesRequest) (*GetPluginCapabilitiesResponse, error)
    // 探活
	Probe(context.Context, *ProbeRequest) (*ProbeResponse, error)
}

csi-nfs源码解析

有了sidecar的帮助, 我们只需要专注于实现自己的逻辑就好了

运行流程

main入口初始化Driver对象, Driver包含了nfs相关信息以及NodeServer。 Driver.Run方法将会启动服务。

func handle() {
	driverOptions := nfs.DriverOptions{
        //...
	}
	d := nfs.NewDriver(&driverOptions)
	d.Run(false)
}
type Driver struct {
    // 相关信息,包括nfs节点等
	name                  string
	nodeID                string
	version               string
	endpoint              string
	mountPermissions      uint64
	workingMountDir       string
	defaultOnDeletePolicy string

	ns          *NodeServer
    // 支持的选项
	cscap       []*csi.ControllerServiceCapability
	nscap       []*csi.NodeServiceCapability
	volumeLocks *VolumeLocks

	volStatsCache                azcache.Resource
	volStatsCacheExpireInMinutes int
}
func (n *Driver) Run(testMode bool) {
	// ...
    // run方法将启动三个server: nodeserver、indentifyserver、controllerserver
    // 三个server的定义上文有描述,实际这三个server服务于不同的角色:
    // nodeserver + indentifyserver 部署在node节点上负责操作文件系统以及心跳
    // controllerserver + indentifyserver 服务端用于接收来自k8s的请求并控制nodeserver
	n.ns = NewNodeServer(n, mounter)
	s := NewNonBlockingGRPCServer()
	s.Start(n.endpoint,
		NewDefaultIdentityServer(n), // 添加IdentityServer
		NewControllerServer(n), // 添加ControllerServer
		n.ns, // 添加NodeServer
		testMode)
	s.Wait()
}

nfs的实现比较简单, 所以没有把controller和nodeserver分成两个入口。

服务启动后, 根据部署文件中添加的sidecar,会调用对应的接口, 大体运行流程如下:

代码实现

因为不同基础存储的实现不一样, 所以仅挑选“创建流程”用到的函数作为了解。

func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
	name := req.GetName()
	if len(name) == 0 {
		return nil, status.Error(codes.InvalidArgument, "CreateVolume name must be provided")
	}
    // 判断是否有不支持的选项
	if err := isValidVolumeCapabilities(req.GetVolumeCapabilities()); err != nil {
		return nil, status.Error(codes.InvalidArgument, err.Error())
	}

	mountPermissions := cs.Driver.mountPermissions
    // 请求的容量
	reqCapacity := req.GetCapacityRange().GetRequiredBytes()
    // 当前插件的自定义参数
	parameters := req.GetParameters()
	if parameters == nil {
		parameters = make(map[string]string)
	}
	// validate parameters (case-insensitive)
	for k, v := range parameters {
		switch strings.ToLower(k) {
		case paramServer:
		case paramShare:
		case paramSubDir:
		case paramOnDelete:
		case pvcNamespaceKey:
		case pvcNameKey:
		case pvNameKey:
			// no op
		case mountPermissionsField:
			if v != "" {
				var err error
                //  判断权限挂载的是否合法
				if mountPermissions, err = strconv.ParseUint(v, 8, 32); err != nil {
					return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("invalid mountPermissions %s in storage class", v))
				}
			}
		default:
			return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("invalid parameter %q in storage class", k))
		}
	}
    // 返回处理后的,当前请求的相关信息,包括server、ondelete等参数
	nfsVol, err := newNFSVolume(name, reqCapacity, parameters, cs.Driver.defaultOnDeletePolicy)

	var volCap *csi.VolumeCapability
	if len(req.GetVolumeCapabilities()) > 0 {
		volCap = req.GetVolumeCapabilities()[0]
	}
	// 挂载nfs基本共享,以便我们可以创建一个子目录。 其中会调用nodeserver服务
	if err = cs.internalMount(ctx, nfsVol, parameters, volCap); err != nil {
		return nil, status.Errorf(codes.Internal, "failed to mount nfs server: %v", err.Error())
	}
	defer func() {
		if err = cs.internalUnmount(ctx, nfsVol); err != nil {
			klog.Warningf("failed to unmount nfs server: %v", err.Error())
		}
	}()

	// 在base-dir下创建子目录
	internalVolumePath := getInternalVolumePath(cs.Driver.workingMountDir, nfsVol)
	if err = os.MkdirAll(internalVolumePath, 0777); err != nil {
		return nil, status.Errorf(codes.Internal, "failed to make subdirectory: %v", err.Error())
	}

	if mountPermissions > 0 {
		// 设置目录权限
		if err = os.Chmod(internalVolumePath, os.FileMode(mountPermissions)); err != nil {
			klog.Warningf("failed to chmod subdirectory: %v", err.Error())
		}
	}

	if req.GetVolumeContentSource() != nil {
		if err := cs.copyVolume(ctx, req, nfsVol); err != nil {
			return nil, err
		}
	}

	setKeyValueInMap(parameters, paramSubDir, nfsVol.subDir)
	return &csi.CreateVolumeResponse{
		Volume: &csi.Volume{
			VolumeId:      nfsVol.id,
			CapacityBytes: 0, // by setting it to zero, Provisioner will use PVC requested size as PV size
			VolumeContext: parameters,
			ContentSource: req.GetVolumeContentSource(),
		},
	}, nil
}