From 3e48a150203a6c46426aa9efd6f229903917683e Mon Sep 17 00:00:00 2001 From: "pawel.urbanowicz" Date: Fri, 9 Oct 2020 09:27:22 +0200 Subject: [PATCH 1/2] feat: s3 support example --- build.gradle | 1 + .../java/batching/SnowflakeWordCountOptions.java | 15 ++++++++++++++- src/main/java/batching/WordCountExample.java | 13 ++++++++++++- src/main/java/streaming/TaxiRidesExample.java | 1 - 4 files changed, 27 insertions(+), 3 deletions(-) diff --git a/build.gradle b/build.gradle index 02baa77..0daa616 100644 --- a/build.gradle +++ b/build.gradle @@ -23,6 +23,7 @@ repositories { dependencies { testCompile group: 'junit', name: 'junit', version: '4.12' compile files('beam-sdks-java-io-snowflake-2.22.0-SNAPSHOT.jar') + compile 'org.apache.beam:beam-sdks-java-io-amazon-web-services:2.22.0' compile group: 'org.apache.beam', name: 'beam-sdks-java-core', version: '2.22.0' compile group: 'org.apache.beam', name: 'beam-runners-direct-java', version: '2.22.0' compile group: 'org.apache.beam', name: 'beam-runners-google-cloud-dataflow-java', version: '2.22.0' diff --git a/src/main/java/batching/SnowflakeWordCountOptions.java b/src/main/java/batching/SnowflakeWordCountOptions.java index 6f98425..b5ec282 100644 --- a/src/main/java/batching/SnowflakeWordCountOptions.java +++ b/src/main/java/batching/SnowflakeWordCountOptions.java @@ -4,11 +4,12 @@ import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.Validation.Required; +import org.apache.beam.sdk.io.aws.options.S3Options; /** * Supported PipelineOptions used in provided examples. */ -public interface SnowflakeWordCountOptions extends SnowflakePipelineOptions { +public interface SnowflakeWordCountOptions extends SnowflakePipelineOptions, S3Options { @Description("Path of the file to read from") @Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt") @@ -21,4 +22,16 @@ public interface SnowflakeWordCountOptions extends SnowflakePipelineOptions { String getOutput(); void setOutput(String value); + + @Description("AWS Access Key") + @Required + String getAwsAccessKey(); + + void setAwsAccessKey(String awsAccessKey); + + @Description("AWS secret key") + @Required + String getAwsSecretKey(); + + void setAwsSecretKey(String awsSecretKey); } \ No newline at end of file diff --git a/src/main/java/batching/WordCountExample.java b/src/main/java/batching/WordCountExample.java index 42d3b6e..dc29109 100644 --- a/src/main/java/batching/WordCountExample.java +++ b/src/main/java/batching/WordCountExample.java @@ -23,10 +23,13 @@ import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; /** * An example that contains batch writing and reading from Snowflake. Inspired by Apache Beam/WordCount-example(https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/WordCount.java) - * + *

* Check main README for more information. */ public class WordCountExample { @@ -35,10 +38,18 @@ public static void main(String[] args) { SnowflakeWordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(SnowflakeWordCountOptions.class); + options = parseAwsOptions(options); + runWritingToSnowflake(options); runReadingFromSnowflake(options); } + private static SnowflakeWordCountOptions parseAwsOptions(SnowflakeWordCountOptions options) { + AWSCredentials awsCredentials = new BasicAWSCredentials(options.getAwsAccessKey(), options.getAwsSecretKey()); + options.setAwsCredentialsProvider(new AWSStaticCredentialsProvider(awsCredentials)); + + return options; + } private static void runWritingToSnowflake(SnowflakeWordCountOptions options) { Pipeline p = Pipeline.create(options); diff --git a/src/main/java/streaming/TaxiRidesExample.java b/src/main/java/streaming/TaxiRidesExample.java index c9c05d3..60da455 100644 --- a/src/main/java/streaming/TaxiRidesExample.java +++ b/src/main/java/streaming/TaxiRidesExample.java @@ -44,7 +44,6 @@ public static void main(String[] args) { .withSnowPipe(options.getSnowPipe()) .withFileNameTemplate(UUID.randomUUID().toString()) .withFlushTimeLimit(Duration.millis(3000)) - .withDebugMode(SnowflakeIO.StreamingLogLevel.INFO) .withFlushRowLimit(100) .withQuotationMark("") .withShardsNumber(1)); From 1de662bcbbf01f2ba89ac84f24e580c99afd3193 Mon Sep 17 00:00:00 2001 From: "pawel.urbanowicz" Date: Fri, 9 Oct 2020 13:13:49 +0200 Subject: [PATCH 2/2] feat(aws): Supporting batching and streaming, doesn't support templates --- README.md | 48 +++++++++++++------ .../batching/SnowflakeWordCountOptions.java | 15 +----- src/main/java/batching/WordCountExample.java | 13 +---- src/main/java/streaming/TaxiRidesExample.java | 10 ++-- src/main/java/streaming/TaxiRidesOptions.java | 11 +++++ src/main/java/util/AwsOptions.java | 23 +++++++++ src/main/java/util/AwsOptionsParser.java | 18 +++++++ 7 files changed, 96 insertions(+), 42 deletions(-) create mode 100644 src/main/java/streaming/TaxiRidesOptions.java create mode 100644 src/main/java/util/AwsOptions.java create mode 100644 src/main/java/util/AwsOptionsParser.java diff --git a/README.md b/README.md index 6f70a47..3f12fb0 100644 --- a/README.md +++ b/README.md @@ -28,8 +28,7 @@ with Google Cloud Platform as a cloud provider. ``` 5. [Create Google Cloud Platform account](https://cloud.google.com/free). 6. [Create a new GCP project](https://cloud.google.com/resource-manager/docs/creating-managing-projects). -7. [Create GCP bucket](https://cloud.google.com/storage/docs/creating-buckets) -8. Create storage integration object in Snowflake using the following command: +7. Depending on using GCS or S3 as file system execute one of the following commands to create storage integration object in Snowflake: ``` CREATE OR REPLACE STORAGE INTEGRATION TYPE = EXTERNAL_STAGE @@ -37,11 +36,21 @@ with Google Cloud Platform as a cloud provider. ENABLED = TRUE STORAGE_ALLOWED_LOCATIONS = ('gcs:///'); ``` + ``` + CREATE STORAGE INTEGRATION aws_integration + TYPE = EXTERNAL_STAGE + STORAGE_PROVIDER = S3 + ENABLED = TRUE + STORAGE_AWS_ROLE_ARN = '' + STORAGE_ALLOWED_LOCATIONS = ('s3:///') + ``` Please note that `gcs` prefix is used here, not `gs`. -9. Authorize Snowflake to operate on your bucket by following [Step 3. Grant the Service Account Permissions to Access Bucket Objects](https://docs.snowflake.com/en/user-guide/data-load-gcs-config.html#step-3-grant-the-service-account-permissions-to-access-bucket-objects) -10. Setup gcloud on your computer by following [Using the Google Cloud SDK installer](https://cloud.google.com/sdk/docs/downloads-interactive) -11. [Install gradle](https://gradle.org/install/) -12. Run following command to set gradle wrapper +7. Authorize Snowflake to operate on your bucket + 1. For GCS follow [Step 3. Grant the Service Account Permissions to Access Bucket Objects](https://docs.snowflake.com/en/user-guide/data-load-gcs-config.html#step-3-grant-the-service-account-permissions-to-access-bucket-objects) + 1. For S3 follow [Configuring a Snowflake Storage Integration](https://docs.snowflake.com/en/user-guide/data-load-s3-config.html#option-1-configuring-a-snowflake-storage-integration) +9. Setup gcloud on your computer by following [Using the Google Cloud SDK installer](https://cloud.google.com/sdk/docs/downloads-interactive) +10. [Install gradle](https://gradle.org/install/) +11. Run following command to set gradle wrapper ``` gradle wrapper ``` @@ -64,7 +73,7 @@ An example consists of two pipelines: ``` ./gradlew run -PmainClass=batching.WordCountExample --args=" \ --inputFile=gs://apache-beam-samples/shakespeare/* \ - --output=gs:///counts \ + --output=:///counts \ --serverName= \ --username= \ --password= \ @@ -72,11 +81,14 @@ An example consists of two pipelines: --schema= \ --tableName= \ --storageIntegrationName= \ - --stagingBucketName= \ + --stagingBucketName= \ --runner= \ --project= \ --gcpTempLocation= \ --region= \ + --awsRegion= \ + --awsAccessKey=\ + --awsSecretKey=\ --appName=" ``` 2. Go to Snowflake console to check saved counts: @@ -84,7 +96,7 @@ An example consists of two pipelines: select from ..; ``` ![Batching snowflake result](./images/batching_snowflake_result.png) -3. Go to GCS bucket to check saved files: +3. Go to GCS or S3 bucket to check saved files: ![Batching gcs result](./images/batching_gcs_result.png) 4. Go to DataFlow to check submitted jobs: ![Batching DataFlow result](./images/batching_dataflow_result.png) @@ -102,12 +114,17 @@ An example is streaming taxi rides from PubSub into Snowflake. lat double ); ``` -2. [Create Snowflake stage](https://docs.snowflake.com/en/sql-reference/sql/create-stage.html) +2. Depending on using GCS or S3 execute one of the following commands to [create Snowflake stage](https://docs.snowflake.com/en/sql-reference/sql/create-stage.html) ``` create or replace stage url = 'gcs:///data/' storage_integration = ; ``` + ``` + create stage + url = 'S3:///data/' + storage_integration = ; + ``` note: SnowflakeIO requires that url must have /data/ as a sufix 3. [Create Key/Pair](https://docs.snowflake.com/en/user-guide/snowsql-start.html#using-key-pair-authentication) for authentication process. @@ -133,10 +150,13 @@ for authentication process. --schema= \ --snowPipe= \ --storageIntegrationName= \ - --stagingBucketName= \ + --stagingBucketName= \ --runner= \ --project= \ --region= \ + --awsRegion= \ + --awsAccessKey=\ + --awsSecretKey=\ --appName=" ``` 2. Go to Snowflake console to check saved taxi rides: @@ -166,7 +186,7 @@ list for currently supported runtime options. --templateLocation=gs:///templates/