Skip to content

Commit 2a8934f

Browse files
committed
rewrite copyvolume
1 parent dc158c8 commit 2a8934f

File tree

3 files changed

+295
-63
lines changed

3 files changed

+295
-63
lines changed

pkg/blob/blob.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,10 @@ var (
162162
supportedProtocolList = []string{Fuse, Fuse2, NFS, AZNFS}
163163
retriableErrors = []string{accountNotProvisioned, tooManyRequests, statusCodeNotFound, containerBeingDeletedDataplaneAPIError, containerBeingDeletedManagementAPIError, clientThrottled}
164164
supportedFSGroupChangePolicyList = []string{FSGroupChangeNone, string(v1.FSGroupChangeAlways), string(v1.FSGroupChangeOnRootMismatch)}
165+
166+
// azcopyCloneVolumeOptions used in volume cloning between different storage account and --check-length to false because volume data may be in changing state, copy volume is not same as current source volume,
167+
// set --s2s-preserve-access-tier=false to avoid BlobAccessTierNotSupportedForAccountType error in azcopy
168+
azcopyCloneVolumeOptions = []string{"--recursive", "--check-length=false", "--s2s-preserve-access-tier=false"}
165169
)
166170

167171
// DriverOptions defines driver parameters specified in driver deployment

pkg/blob/controllerserver.go

Lines changed: 93 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -366,36 +366,33 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
366366
defer d.volumeLocks.Release(volName)
367367

368368
requestName := "controller_create_volume"
369-
370-
var srcAzcopyAuthEnv []string
371-
var srcSubscriptionID, srcResourceGroupName, srcAccountName, srcContainerName, srcPath, srcAccountSASToken string
372369
if volContentSource != nil {
373370
switch volContentSource.Type.(type) {
374371
case *csi.VolumeContentSource_Snapshot:
375-
return nil, status.Errorf(codes.InvalidArgument, "VolumeContentSource Snapshot is not yet implemented")
372+
requestName = "controller_create_volume_from_snapshot"
376373
case *csi.VolumeContentSource_Volume:
377374
requestName = "controller_create_volume_from_volume"
378-
var srcVolumeID string
379-
if volContentSource.GetVolume() != nil {
380-
srcVolumeID = volContentSource.GetVolume().GetVolumeId()
381-
}
382-
srcResourceGroupName, srcAccountName, srcContainerName, _, srcSubscriptionID, err = GetContainerInfo(srcVolumeID)
383-
if err != nil {
384-
return nil, status.Error(codes.NotFound, err.Error())
385-
}
386-
srcAccountOptions := &azure.AccountOptions{
387-
Name: srcAccountName,
388-
SubscriptionID: srcSubscriptionID,
389-
ResourceGroup: srcResourceGroupName,
390-
GetLatestAccountKey: getLatestAccountKey,
391-
}
392-
srcAccountSASToken, srcAzcopyAuthEnv, err = d.getAzcopyAuth(ctx, srcAccountName, "", storageEndpointSuffix, srcAccountOptions, secrets, secretName, secretNamespace)
393-
if err != nil {
394-
return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
395-
}
396-
srcPath = fmt.Sprintf("https://%s.blob.%s/%s", srcAccountName, storageEndpointSuffix, srcContainerName)
397-
default:
398-
return nil, status.Errorf(codes.InvalidArgument, "VolumeContentSource is not recognized: %v", volContentSource)
375+
// var srcVolumeID string
376+
// if volContentSource.GetVolume() != nil {
377+
// srcVolumeID = volContentSource.GetVolume().GetVolumeId()
378+
// }
379+
// srcResourceGroupName, srcAccountName, srcContainerName, _, srcSubscriptionID, err = GetContainerInfo(srcVolumeID)
380+
// if err != nil {
381+
// return nil, status.Error(codes.NotFound, err.Error())
382+
// }
383+
// srcAccountOptions := &azure.AccountOptions{
384+
// Name: srcAccountName,
385+
// SubscriptionID: srcSubscriptionID,
386+
// ResourceGroup: srcResourceGroupName,
387+
// GetLatestAccountKey: getLatestAccountKey,
388+
// }
389+
// srcAccountSASToken, srcAzcopyAuthEnv, err = d.getAzcopyAuth(ctx, srcAccountName, "", storageEndpointSuffix, srcAccountOptions, secrets, secretName, secretNamespace)
390+
// if err != nil {
391+
// return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
392+
// }
393+
// srcPath = fmt.Sprintf("https://%s.blob.%s/%s", srcAccountName, storageEndpointSuffix, srcContainerName)
394+
// default:
395+
// return nil, status.Errorf(codes.InvalidArgument, "VolumeContentSource is not recognized: %v", volContentSource)
399396
}
400397
}
401398

@@ -466,18 +463,25 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
466463
return nil, status.Errorf(codes.Internal, "failed to create container(%s) on account(%s) type(%s) rg(%s) location(%s) size(%d), error: %v", validContainerName, accountName, storageAccountType, resourceGroup, location, requestGiB, err)
467464
}
468465
if volContentSource != nil {
469-
dstAzcopyAuthEnv := srcAzcopyAuthEnv
470-
dstAccountSASToken := srcAccountSASToken
471-
if srcAccountName != accountName {
472-
if dstAccountSASToken, dstAzcopyAuthEnv, err = d.getAzcopyAuth(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace); err != nil {
473-
return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
474-
}
466+
accountSASToken, authAzcopyEnv, err := d.getAzcopyAuth(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace)
467+
if err != nil {
468+
return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
475469
}
476-
477-
dstPath := fmt.Sprintf("https://%s.blob.%s/%s", accountName, storageEndpointSuffix, validContainerName)
478-
if err := d.copyBlobContainer(dstAzcopyAuthEnv, srcPath, srcAccountSASToken, dstPath, dstAccountSASToken, validContainerName); err != nil {
470+
if err := d.copyVolume(ctx, req, accountName, accountSASToken, authAzcopyEnv, validContainerName, secretNamespace, accountOptions, storageEndpointSuffix); err != nil {
479471
return nil, err
480472
}
473+
// dstAzcopyAuthEnv := srcAzcopyAuthEnv
474+
// dstAccountSASToken := srcAccountSASToken
475+
// if srcAccountName != accountName {
476+
// if dstAccountSASToken, dstAzcopyAuthEnv, err = d.getAzcopyAuth(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace); err != nil {
477+
// return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
478+
// }
479+
// }
480+
481+
// dstPath := fmt.Sprintf("https://%s.blob.%s/%s", accountName, storageEndpointSuffix, validContainerName)
482+
// if err := d.copyBlobContainer(dstAzcopyAuthEnv, srcPath, srcAccountSASToken, dstPath, dstAccountSASToken, validContainerName); err != nil {
483+
// return nil, err
484+
// }
481485
}
482486

483487
if storeAccountKey && len(req.GetSecrets()) == 0 {
@@ -786,11 +790,36 @@ func (d *Driver) DeleteBlobContainer(ctx context.Context, subsID, resourceGroupN
786790
}
787791

788792
// copyBlobContainer copies source volume content into a destination volume
789-
func (d *Driver) copyBlobContainer(authAzcopyEnv []string, srcPath string, srcAccountSASToken string, dstPath string, dstAccountSASToken string, dstContainerName string) error {
793+
func (d *Driver) copyBlobContainer(ctx context.Context, req *csi.CreateVolumeRequest, dstAccountName string, dstAccountSasToken string, authAzcopyEnv []string, dstContainerName string, secretNamespace string, accountOptions *azure.AccountOptions, storageEndpointSuffix string) error {
794+
var sourceVolumeID string
795+
if req.GetVolumeContentSource() != nil && req.GetVolumeContentSource().GetVolume() != nil {
796+
sourceVolumeID = req.GetVolumeContentSource().GetVolume().GetVolumeId()
790797

791-
if srcPath == "" || dstPath == "" || dstContainerName == "" {
792-
return fmt.Errorf("srcPath(%s) or dstPath(%s) or dstContainerName(%s) is empty", srcPath, dstPath, dstContainerName)
793798
}
799+
resourceGroupName, srcAccountName, srcContainerName, _, _, err := GetContainerInfo(sourceVolumeID) //nolint:dogsled
800+
if err != nil {
801+
return status.Error(codes.NotFound, err.Error())
802+
}
803+
if dstAccountName == "" {
804+
dstAccountName = srcAccountName
805+
}
806+
if srcAccountName == "" || srcContainerName == "" || dstContainerName == "" {
807+
return fmt.Errorf("srcAccountName(%s) or srcContainerName(%s) or dstContainerName(%s) is empty", srcAccountName, srcContainerName, dstContainerName)
808+
}
809+
srcAccountSasToken := dstAccountSasToken
810+
if srcAccountName != dstAccountName && dstAccountSasToken != "" {
811+
srcAccountOptions := &azure.AccountOptions{
812+
Name: srcAccountName,
813+
ResourceGroup: accountOptions.ResourceGroup,
814+
SubscriptionID: accountOptions.SubscriptionID,
815+
GetLatestAccountKey: accountOptions.GetLatestAccountKey,
816+
}
817+
if srcAccountSasToken, _, err = d.getAzcopyAuth(ctx, srcAccountName, "", storageEndpointSuffix, srcAccountOptions, nil, "", secretNamespace); err != nil {
818+
return err
819+
}
820+
}
821+
srcPath := fmt.Sprintf("https://%s.blob.%s/%s%s", srcAccountName, storageEndpointSuffix, srcContainerName, srcAccountSasToken)
822+
dstPath := fmt.Sprintf("https://%s.blob.%s/%s%s", dstAccountName, storageEndpointSuffix, dstContainerName, dstAccountSasToken)
794823

795824
jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
796825
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
@@ -800,13 +829,9 @@ func (d *Driver) copyBlobContainer(authAzcopyEnv []string, srcPath string, srcAc
800829
case util.AzcopyJobRunning:
801830
return fmt.Errorf("wait for the existing AzCopy job to complete, current copy percentage is %s%%", percent)
802831
case util.AzcopyJobNotFound:
803-
klog.V(2).Infof("copy blob container %s to %s", srcPath, dstContainerName)
832+
klog.V(2).Infof("copy blob container %s:%s to %s:%s", srcAccountName, srcContainerName, dstAccountName, dstContainerName)
804833
execFunc := func() error {
805-
cmd := exec.Command("azcopy", "copy", srcPath+srcAccountSASToken, dstPath+dstAccountSASToken, "--recursive", "--check-length=false", "--s2s-preserve-access-tier=false")
806-
if len(authAzcopyEnv) > 0 {
807-
cmd.Env = append(os.Environ(), authAzcopyEnv...)
808-
}
809-
if out, err := cmd.CombinedOutput(); err != nil {
834+
if out, err := d.execAzcopyCopy(srcPath, dstPath, azcopyCloneVolumeOptions, authAzcopyEnv); err != nil {
810835
return fmt.Errorf("exec error: %v, output: %v", err, string(out))
811836
}
812837
return nil
@@ -817,15 +842,38 @@ func (d *Driver) copyBlobContainer(authAzcopyEnv []string, srcPath string, srcAc
817842
}
818843
copyErr := util.WaitUntilTimeout(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execFunc, timeoutFunc)
819844
if copyErr != nil {
820-
klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error: %v", srcPath, dstPath, dstContainerName, copyErr)
845+
klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error: %v", resourceGroupName, dstAccountName, dstContainerName, copyErr)
821846
} else {
822-
klog.V(2).Infof("copied blob container %s to %s successfully", srcPath, dstContainerName)
847+
klog.V(2).Infof("copied blob container %s to %s successfully", srcContainerName, dstContainerName)
823848
}
824849
return copyErr
825850
}
826851
return err
827852
}
828853

854+
// copyVolume copies a volume form volume or snapshot, snapshot is not supported now
855+
func (d *Driver) copyVolume(ctx context.Context, req *csi.CreateVolumeRequest, accountName string, accountSASToken string, authAzcopyEnv []string, dstContainerName, secretNamespace string, accountOptions *azure.AccountOptions, storageEndpointSuffix string) error {
856+
vs := req.VolumeContentSource
857+
switch vs.Type.(type) {
858+
case *csi.VolumeContentSource_Snapshot:
859+
return status.Errorf(codes.InvalidArgument, "VolumeContentSource Snapshot is not yet implemented")
860+
case *csi.VolumeContentSource_Volume:
861+
return d.copyBlobContainer(ctx, req, accountName, accountSASToken, authAzcopyEnv, dstContainerName, secretNamespace, accountOptions, storageEndpointSuffix)
862+
default:
863+
return status.Errorf(codes.InvalidArgument, "%v is not a proper volume source", vs)
864+
}
865+
}
866+
867+
// execAzcopyCopy exec azcopy copy command
868+
func (d *Driver) execAzcopyCopy(srcPath, dstPath string, azcopyCopyOptions, authAzcopyEnv []string) ([]byte, error) {
869+
cmd := exec.Command("azcopy", "copy", srcPath, dstPath)
870+
cmd.Args = append(cmd.Args, azcopyCopyOptions...)
871+
if len(authAzcopyEnv) > 0 {
872+
cmd.Env = append(os.Environ(), authAzcopyEnv...)
873+
}
874+
return cmd.CombinedOutput()
875+
}
876+
829877
// authorizeAzcopyWithIdentity returns auth env for azcopy using cluster identity
830878
func (d *Driver) authorizeAzcopyWithIdentity() ([]string, error) {
831879
azureAuthConfig := d.cloud.Config.AzureAuthConfig

0 commit comments

Comments
 (0)