Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 111 additions & 9 deletions cli/azd/pkg/azsdk/storage/storage_blob_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"io"
"path/filepath"
"sync"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
Expand Down Expand Up @@ -56,8 +57,10 @@ func NewBlobClient(
}

type blobClient struct {
config *AccountConfig
client *azblob.Client
config *AccountConfig
client *azblob.Client
containerVerified bool
mu sync.Mutex
}

// Blob represents a blob within a storage account container.
Expand All @@ -70,10 +73,25 @@ type Blob struct {

// Items returns a list of blobs in the configured storage account container.
func (bc *blobClient) Items(ctx context.Context) ([]*Blob, error) {
if err := bc.ensureContainerExists(ctx); err != nil {
if err := bc.ensureContainerReady(ctx); err != nil {
return nil, err
}

blobs, err := bc.listBlobs(ctx)
if err != nil {
if bc.isContainerNotFound(err) {
if createErr := bc.resetAndEnsureContainer(ctx); createErr != nil {
return nil, createErr
}
return bc.listBlobs(ctx)
}
return nil, err
}

return blobs, nil
}

func (bc *blobClient) listBlobs(ctx context.Context) ([]*Blob, error) {
blobs := []*Blob{}

pager := bc.client.NewListBlobsFlatPager(bc.config.ContainerName, nil)
Expand All @@ -98,12 +116,22 @@ func (bc *blobClient) Items(ctx context.Context) ([]*Blob, error) {

// Download downloads a blob from the configured storage account container.
func (bc *blobClient) Download(ctx context.Context, blobPath string) (io.ReadCloser, error) {
if err := bc.ensureContainerExists(ctx); err != nil {
if err := bc.ensureContainerReady(ctx); err != nil {
return nil, err
}

resp, err := bc.client.DownloadStream(ctx, bc.config.ContainerName, blobPath, nil)
if err != nil {
if bc.isContainerNotFound(err) {
if createErr := bc.resetAndEnsureContainer(ctx); createErr != nil {
return nil, createErr
}
resp, err = bc.client.DownloadStream(ctx, bc.config.ContainerName, blobPath, nil)
if err != nil {
return nil, fmt.Errorf("failed to download blob '%s', %w", blobPath, err)
}
return resp.Body, nil
}
return nil, fmt.Errorf("failed to download blob '%s', %w", blobPath, err)
}

Expand All @@ -112,12 +140,33 @@ func (bc *blobClient) Download(ctx context.Context, blobPath string) (io.ReadClo

// Upload uploads a blob to the configured storage account container.
func (bc *blobClient) Upload(ctx context.Context, blobPath string, reader io.Reader) error {
if err := bc.ensureContainerExists(ctx); err != nil {
if err := bc.ensureContainerReady(ctx); err != nil {
return err
}

_, err := bc.client.UploadStream(ctx, bc.config.ContainerName, blobPath, reader, nil)
if err != nil {
if bc.isContainerNotFound(err) {
if createErr := bc.resetAndEnsureContainer(ctx); createErr != nil {
return createErr
}
// Only retry if the reader supports seeking back to the start.
// io.Reader is non-rewindable, so retrying with an exhausted
// reader would upload empty/partial content.
if seeker, ok := reader.(io.Seeker); ok {
if _, seekErr := seeker.Seek(0, io.SeekStart); seekErr == nil {
_, err = bc.client.UploadStream(
ctx, bc.config.ContainerName, blobPath, reader, nil)
if err != nil {
return fmt.Errorf(
"failed to upload blob '%s', %w", blobPath, err)
}
return nil
}
}
// Container re-created but can't retry upload; caller must retry
return fmt.Errorf("failed to upload blob '%s', %w", blobPath, err)
}
return fmt.Errorf("failed to upload blob '%s', %w", blobPath, err)
}

Expand All @@ -126,25 +175,78 @@ func (bc *blobClient) Upload(ctx context.Context, blobPath string, reader io.Rea

// Delete deletes a blob from the configured storage account container.
func (bc *blobClient) Delete(ctx context.Context, blobPath string) error {
if err := bc.ensureContainerExists(ctx); err != nil {
if err := bc.ensureContainerReady(ctx); err != nil {
return err
}

_, err := bc.client.DeleteBlob(ctx, bc.config.ContainerName, blobPath, nil)
if err != nil {
if bc.isContainerNotFound(err) {
if createErr := bc.resetAndEnsureContainer(ctx); createErr != nil {
return createErr
}
_, err = bc.client.DeleteBlob(ctx, bc.config.ContainerName, blobPath, nil)
if err != nil {
return fmt.Errorf("failed to delete blob '%s', %w", blobPath, err)
}
return nil
}
return fmt.Errorf("failed to delete blob '%s', %w", blobPath, err)
}

return nil
}

// Check if the specified container exists
// If it doesn't already exist then create it
// ensureContainerReady checks that the container exists on the first call,
// then skips the check on subsequent calls. If a container-not-found error
// occurs during an operation, callers use resetAndEnsureContainer to recover.
func (bc *blobClient) ensureContainerReady(ctx context.Context) error {
bc.mu.Lock()
defer bc.mu.Unlock()

if bc.containerVerified {
return nil
}

if err := bc.ensureContainerExists(ctx); err != nil {
return err
}

bc.containerVerified = true
return nil
}

// resetAndEnsureContainer resets the verified flag and re-checks/creates the container.
// Used when an operation fails with container-not-found (e.g., container deleted externally).
func (bc *blobClient) resetAndEnsureContainer(ctx context.Context) error {
bc.mu.Lock()
defer bc.mu.Unlock()

bc.containerVerified = false

if err := bc.ensureContainerExists(ctx); err != nil {
return err
}

bc.containerVerified = true
return nil
}

// isContainerNotFound checks if the error indicates the container was not found.
func (bc *blobClient) isContainerNotFound(err error) bool {
var respErr *azcore.ResponseError
if errors.As(err, &respErr) {
return respErr.ErrorCode == "ContainerNotFound"
}
return false
}

// ensureContainerExists checks if the container exists and creates it if not.
func (bc *blobClient) ensureContainerExists(ctx context.Context) error {
exists := false

pager := bc.client.NewListContainersPager(nil)
for pager.More() {
for pager.More() && !exists {
page, err := pager.NextPage(ctx)
if err != nil {
return fmt.Errorf("failed getting next page of containers: %w", err)
Expand Down
Loading