如何实现一个CSI?
CSI
是一个与存储系统交互的接口标准,它允许存储系统的供应商编写一个独立的插件,以便将其存储系统与Kubernetes集成。
本文将参考开源项目,介绍如何实现一个CSI。
本文所使用环境:
- k8s版本:v1.27
- csi-driver-nfs版本:v4.6.0
nfs csi项目地址: https://github.com/kubernetes-csi/csi-driver-nfs.git
pod调度过程查看文章《从源码解析KubeScheduler调度过程》
Attachdetach控制器查看文章《Attachdetach控制器源码解析》
如何创建一个CSI并部署到k8s中?
首先我们向集群中部署csi-nfs
, 这是一个nfs的csi实现。
blog|main ⇒ curl -skSL https://raw.githubusercontent.com/kubernetes-csi/csi-driver-nfs/master/deploy/install-driver.sh | bash -s master --
Installing NFS CSI driver, version: master ...
serviceaccount/csi-nfs-controller-sa created
serviceaccount/csi-nfs-node-sa created
clusterrole.rbac.authorization.k8s.io/nfs-external-provisioner-role created
clusterrolebinding.rbac.authorization.k8s.io/nfs-csi-provisioner-binding created
csidriver.storage.k8s.io/nfs.csi.k8s.io created
deployment.apps/csi-nfs-controller created
daemonset.apps/csi-nfs-node created
NFS CSI driver installed successfully.
从部署输出中可以发现, 实现一个csi并部署到k8s需要以下组件:
- 开发
- controller(deployment.apps/csi-nfs-controller) 实现csi控制器,部署在server。
- node plugin(deamonset.apps/csi-nfs-node) 实现存储交互逻辑, 部署在node节点。
- 部署
- sa、rbac 用于授权。
- csidriver 用于向k8s注册csi driver,声明相关信息。
- deployment、daemonset 用于部署controller和app。
csidriver
声明信息如下(sa和rbac所有插件大同小异,这里不展开):
apiVersion: storage.k8s.io/v1
kind: CSIDriver
metadata:
annotations:
kubectl.kubernetes.io/last-applied-configuration: |
{"apiVersion":"storage.k8s.io/v1","kind":"CSIDriver","metadata":{"annotations":{},"name":"nfs.csi.k8s.io"},"spec":{"attachRequired":false,"fsGroupPolicy":"File","volumeLifecycleModes":["Persistent"]}}
creationTimestamp: "2024-02-23T02:31:21Z"
name: nfs.csi.k8s.io
resourceVersion: "2505462"
uid: 80952849-8a1b-4d10-b2cd-ea9eba3c2a8e
spec:
attachRequired: false # 是否支持附加操作。 nfs因为是文件系统,不需要向系统挂载块设备, 所以是false。cephfs等块存储是true。
fsGroupPolicy: File # 文件系统组策略。定义底层卷是否支持在挂载之前更改卷的所有权和权限。
podInfoOnMount: false # 是否在挂载时提供pod信息。如果为true,则在挂载时将pod信息(如podName, podUID)传递给CSI驱动程序。
requiresRepublish: false # 是否需要重新发布。如果为true,NodePublishVolume将被周期性地调用。
storageCapacity: false # 是否支持存储容量查询。如果为true,存储容量的评估将参与到调度过程中。
# seLinuxMount: boolen 指定 CSI 驱动是否支持 "-o context" 挂载选项
# tokenRequests: []TokenRequest 用于指定 CSI 驱动程序所需的令牌请求
volumeLifecycleModes: # 支持的卷生命周期模式
- Persistent # 持久化卷
# - Ephemeral 临时卷, 与pod生命周期关联
还有另外两个与
csi
相关的资源对象,上述部署回显中没有体现:
- CSINode 包含节点上安装的所有 CSI 驱动有关的信息, 以及topologyKeys(用于拓扑调度)
- CSIStorageCapacities 描述指定CSI特定拓扑段中可用的容量, 需要csi支持
storageCapacity
才能生效。
csi sidecar
为了能使开发人员专注实现csi中“个性”的逻辑, k8s
将与api
交互的通用逻辑变成了sidecar
, 大大简化了CSI驱动程序的开发和部署。
这些sidecar可以带来如下好处:
- 减少“样板”代码。CSI 驱动程序开发人员不必担心复杂的“Kubernetes 特定”代码。
- 分离。与 Kubernetes API 交互的代码与实现 CSI 接口的代码隔离(并且位于不同的容器中)。
举例常用sidecar实现:
- external-provisioner。监听
PersistentVolumeClaim
,并调用csi CreateVolume
创建新的卷。 - external-attache。监听
VolumeAttachment
,并调用Controller[Publish|Unpublish]Volume
挂载设备到主机。 - external-resizer。监听
PersistentVolumeClaim
,并调用ControllerExpandVolume
进行卷扩容。 - external-snapshotter。监听
VolumeSnapshot
和VolumeSnapshotContent
,并调用CreateSnapshot, DeleteSnapshot, and ListSnapshots
进行快照操作 - livenessprobe。存活检测
- node-driver-registrar。向
kubelet
注册插件。
我们查看csi-nfs
的部署文件中,使用了哪些sidecar(yaml内容省略了无用信息):
# kubectl get deployments.apps -n kube-system csi-nfs-controller -o yaml
containers:
- image: registry.k8s.io/sig-storage/csi-provisioner:v4.0.0
name: csi-provisioner
- image: registry.k8s.io/sig-storage/csi-snapshotter:v6.3.3
name: csi-snapshotter
- image: registry.k8s.io/sig-storage/livenessprobe:v2.12.0
name: liveness-probe
- image: gcr.io/k8s-staging-sig-storage/nfsplugin:canary
name: nfs
---
# kubectl get daemonsets.apps -n kube-system csi-nfs-node -o yaml
containers:
- image: registry.k8s.io/sig-storage/livenessprobe:v2.12.0
name: liveness-probe
- image: registry.k8s.io/sig-storage/csi-node-driver-registrar:v2.10.0
name: node-driver-registrar
- image: gcr.io/k8s-staging-sig-storage/nfsplugin:canary
name: nfs
csi api定义
csi NodeServer 需要实现以下API:
// for grpc
type NodeServer interface {
// 对于块设备的挂载与卸载
NodeStageVolume(context.Context, *NodeStageVolumeRequest) (*NodeStageVolumeResponse, error)
NodeUnstageVolume(context.Context, *NodeUnstageVolumeRequest) (*NodeUnstageVolumeResponse, error)
// 文件系统的挂载与卸载
NodePublishVolume(context.Context, *NodePublishVolumeRequest) (*NodePublishVolumeResponse, error)
NodeUnpublishVolume(context.Context, *NodeUnpublishVolumeRequest) (*NodeUnpublishVolumeResponse, error)
// 状态获取
NodeGetVolumeStats(context.Context, *NodeGetVolumeStatsRequest) (*NodeGetVolumeStatsResponse, error)
// 扩容
NodeExpandVolume(context.Context, *NodeExpandVolumeRequest) (*NodeExpandVolumeResponse, error)
// 容量获取
NodeGetCapabilities(context.Context, *NodeGetCapabilitiesRequest) (*NodeGetCapabilitiesResponse, error)
// 基本信息
NodeGetInfo(context.Context, *NodeGetInfoRequest) (*NodeGetInfoResponse, error)
}
// grpc服务会调用此具体实现
type Interface interface {
// 挂载指定路径,不可包含敏感项
Mount(source string, target string, fstype string, options []string) error
// 与mount一致, 但是可以包含敏感项(如密码)并不会被记录
MountSensitive(source string, target string, fstype string, options []string, sensitiveOptions []string) error
// 与MountSensitive一致,但禁用systemd
MountSensitiveWithoutSystemd(source string, target string, fstype string, options []string, sensitiveOptions []string) error
// 与MountSensitiveWithoutSystemd一致, 但包含附加选项
MountSensitiveWithoutSystemdWithMountFlags(source string, target string, fstype string, options []string, sensitiveOptions []string, mountFlags []string) error
// 卸载
Unmount(target string) error
// 返回挂载点列表
List() ([]MountPoint, error)
// 判断是否为挂载点, 此方法无法检测linux绑定挂载和符号连接, 但是速度比List更快
IsLikelyNotMountPoint(file string) (bool, error)
// 执行挂载检查
CanSafelySkipMountPointCheck() bool
// 判断是否为挂载点,此方法可以枚举list中所有对象进行插件,但是比IsLikelyNotMountPoint消耗更大
IsMountPoint(file string) (bool, error)
// 返回与目标路径关联的所有挂载点(在windows上不能检测到所有挂载点)
// 如 /dev/sdc 挂载在/path/a 和 /path/b
// GetMountRefs("/path/a") would return ["/path/b"]
// GetMountRefs("/path/b") would return ["/path/a"]
GetMountRefs(pathname string) ([]string, error)
}
csi ControllerServer需要实现以下API:
type ControllerServer interface {
// 创建/删除卷
CreateVolume(context.Context, *CreateVolumeRequest) (*CreateVolumeResponse, error)
DeleteVolume(context.Context, *DeleteVolumeRequest) (*DeleteVolumeResponse, error)
// 挂载/卸载卷
ControllerPublishVolume(context.Context, *ControllerPublishVolumeRequest) (*ControllerPublishVolumeResponse, error)
ControllerUnpublishVolume(context.Context, *ControllerUnpublishVolumeRequest) (*ControllerUnpublishVolumeResponse, error)
// 检查卷合法性
ValidateVolumeCapabilities(context.Context, *ValidateVolumeCapabilitiesRequest) (*ValidateVolumeCapabilitiesResponse, error)
// 卷列表
ListVolumes(context.Context, *ListVolumesRequest) (*ListVolumesResponse, error)
// 获取容量
GetCapacity(context.Context, *GetCapacityRequest) (*GetCapacityResponse, error)
// 获取支持的功能
ControllerGetCapabilities(context.Context, *ControllerGetCapabilitiesRequest) (*ControllerGetCapabilitiesResponse, error)
// 创建/删除/查询快照
CreateSnapshot(context.Context, *CreateSnapshotRequest) (*CreateSnapshotResponse, error)
DeleteSnapshot(context.Context, *DeleteSnapshotRequest) (*DeleteSnapshotResponse, error)
ListSnapshots(context.Context, *ListSnapshotsRequest) (*ListSnapshotsResponse, error)
// 扩容
ControllerExpandVolume(context.Context, *ControllerExpandVolumeRequest) (*ControllerExpandVolumeResponse, error)
ControllerGetVolume(context.Context, *ControllerGetVolumeRequest) (*ControllerGetVolumeResponse, error)
}
IdentityServer需要实现以下API:
type IdentityServer interface {
// 插件信息
GetPluginInfo(context.Context, *GetPluginInfoRequest) (*GetPluginInfoResponse, error)
// 获取插件功能
GetPluginCapabilities(context.Context, *GetPluginCapabilitiesRequest) (*GetPluginCapabilitiesResponse, error)
// 探活
Probe(context.Context, *ProbeRequest) (*ProbeResponse, error)
}
csi-nfs源码解析
有了sidecar的帮助, 我们只需要专注于实现自己的逻辑就好了
运行流程
main
入口初始化Driver
对象, Driver
包含了nfs相关信息以及NodeServer。 Driver.Run
方法将会启动服务。
func handle() {
driverOptions := nfs.DriverOptions{
//...
}
d := nfs.NewDriver(&driverOptions)
d.Run(false)
}
type Driver struct {
// 相关信息,包括nfs节点等
name string
nodeID string
version string
endpoint string
mountPermissions uint64
workingMountDir string
defaultOnDeletePolicy string
ns *NodeServer
// 支持的选项
cscap []*csi.ControllerServiceCapability
nscap []*csi.NodeServiceCapability
volumeLocks *VolumeLocks
volStatsCache azcache.Resource
volStatsCacheExpireInMinutes int
}
func (n *Driver) Run(testMode bool) {
// ...
// run方法将启动三个server: nodeserver、indentifyserver、controllerserver
// 三个server的定义上文有描述,实际这三个server服务于不同的角色:
// nodeserver + indentifyserver 部署在node节点上负责操作文件系统以及心跳
// controllerserver + indentifyserver 服务端用于接收来自k8s的请求并控制nodeserver
n.ns = NewNodeServer(n, mounter)
s := NewNonBlockingGRPCServer()
s.Start(n.endpoint,
NewDefaultIdentityServer(n), // 添加IdentityServer
NewControllerServer(n), // 添加ControllerServer
n.ns, // 添加NodeServer
testMode)
s.Wait()
}
nfs的实现比较简单, 所以没有把controller和nodeserver分成两个入口。
服务启动后, 根据部署文件中添加的sidecar,会调用对应的接口, 大体运行流程如下:
代码实现
因为不同基础存储的实现不一样, 所以仅挑选“创建流程”用到的函数作为了解。
func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
name := req.GetName()
if len(name) == 0 {
return nil, status.Error(codes.InvalidArgument, "CreateVolume name must be provided")
}
// 判断是否有不支持的选项
if err := isValidVolumeCapabilities(req.GetVolumeCapabilities()); err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
mountPermissions := cs.Driver.mountPermissions
// 请求的容量
reqCapacity := req.GetCapacityRange().GetRequiredBytes()
// 当前插件的自定义参数
parameters := req.GetParameters()
if parameters == nil {
parameters = make(map[string]string)
}
// validate parameters (case-insensitive)
for k, v := range parameters {
switch strings.ToLower(k) {
case paramServer:
case paramShare:
case paramSubDir:
case paramOnDelete:
case pvcNamespaceKey:
case pvcNameKey:
case pvNameKey:
// no op
case mountPermissionsField:
if v != "" {
var err error
// 判断权限挂载的是否合法
if mountPermissions, err = strconv.ParseUint(v, 8, 32); err != nil {
return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("invalid mountPermissions %s in storage class", v))
}
}
default:
return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("invalid parameter %q in storage class", k))
}
}
// 返回处理后的,当前请求的相关信息,包括server、ondelete等参数
nfsVol, err := newNFSVolume(name, reqCapacity, parameters, cs.Driver.defaultOnDeletePolicy)
var volCap *csi.VolumeCapability
if len(req.GetVolumeCapabilities()) > 0 {
volCap = req.GetVolumeCapabilities()[0]
}
// 挂载nfs基本共享,以便我们可以创建一个子目录。 其中会调用nodeserver服务
if err = cs.internalMount(ctx, nfsVol, parameters, volCap); err != nil {
return nil, status.Errorf(codes.Internal, "failed to mount nfs server: %v", err.Error())
}
defer func() {
if err = cs.internalUnmount(ctx, nfsVol); err != nil {
klog.Warningf("failed to unmount nfs server: %v", err.Error())
}
}()
// 在base-dir下创建子目录
internalVolumePath := getInternalVolumePath(cs.Driver.workingMountDir, nfsVol)
if err = os.MkdirAll(internalVolumePath, 0777); err != nil {
return nil, status.Errorf(codes.Internal, "failed to make subdirectory: %v", err.Error())
}
if mountPermissions > 0 {
// 设置目录权限
if err = os.Chmod(internalVolumePath, os.FileMode(mountPermissions)); err != nil {
klog.Warningf("failed to chmod subdirectory: %v", err.Error())
}
}
if req.GetVolumeContentSource() != nil {
if err := cs.copyVolume(ctx, req, nfsVol); err != nil {
return nil, err
}
}
setKeyValueInMap(parameters, paramSubDir, nfsVol.subDir)
return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
VolumeId: nfsVol.id,
CapacityBytes: 0, // by setting it to zero, Provisioner will use PVC requested size as PV size
VolumeContext: parameters,
ContentSource: req.GetVolumeContentSource(),
},
}, nil
}