Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 34 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,29 @@ with Google Cloud Platform as a cloud provider.
```
5. [Create Google Cloud Platform account](https://cloud.google.com/free).
6. [Create a new GCP project](https://cloud.google.com/resource-manager/docs/creating-managing-projects).
7. [Create GCP bucket](https://cloud.google.com/storage/docs/creating-buckets)
8. Create storage integration object in Snowflake using the following command:
7. Depending on using GCS or S3 as file system execute one of the following commands to create storage integration object in Snowflake:
```
CREATE OR REPLACE STORAGE INTEGRATION <INTEGRATION NAME>
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = GCS
ENABLED = TRUE
STORAGE_ALLOWED_LOCATIONS = ('gcs://<BUCKET NAME>/');
```
```
CREATE STORAGE INTEGRATION aws_integration
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = S3
ENABLED = TRUE
STORAGE_AWS_ROLE_ARN = '<ARN ROLE NAME>'
STORAGE_ALLOWED_LOCATIONS = ('s3://<BUCKET NAME>/')
```
Please note that `gcs` prefix is used here, not `gs`.
9. Authorize Snowflake to operate on your bucket by following [Step 3. Grant the Service Account Permissions to Access Bucket Objects](https://docs.snowflake.com/en/user-guide/data-load-gcs-config.html#step-3-grant-the-service-account-permissions-to-access-bucket-objects)
10. Setup gcloud on your computer by following [Using the Google Cloud SDK installer](https://cloud.google.com/sdk/docs/downloads-interactive)
11. [Install gradle](https://gradle.org/install/)
12. Run following command to set gradle wrapper
7. Authorize Snowflake to operate on your bucket
1. For GCS follow [Step 3. Grant the Service Account Permissions to Access Bucket Objects](https://docs.snowflake.com/en/user-guide/data-load-gcs-config.html#step-3-grant-the-service-account-permissions-to-access-bucket-objects)
1. For S3 follow [Configuring a Snowflake Storage Integration](https://docs.snowflake.com/en/user-guide/data-load-s3-config.html#option-1-configuring-a-snowflake-storage-integration)
9. Setup gcloud on your computer by following [Using the Google Cloud SDK installer](https://cloud.google.com/sdk/docs/downloads-interactive)
10. [Install gradle](https://gradle.org/install/)
11. Run following command to set gradle wrapper
```
gradle wrapper
```
Expand All @@ -64,27 +73,30 @@ An example consists of two pipelines:
```
./gradlew run -PmainClass=batching.WordCountExample --args=" \
--inputFile=gs://apache-beam-samples/shakespeare/* \
--output=gs://<GCS BUCKET NAME>/counts \
--output=<gs or s3>://<GCS OR S3 BUCKET NAME>/counts \
--serverName=<SNOWFLAKE SERVER NAME> \
--username=<SNOWFLAKE USERNAME> \
--password=<SNOWFLAKE PASSWORD> \
--database=<SNOWFLAKE DATABASE> \
--schema=<SNOWFLAKE SCHEMA> \
--tableName=<SNOWFLAKE TABLE NAME> \
--storageIntegrationName=<SNOWFLAKE STORAGE INTEGRATION NAME> \
--stagingBucketName=<GCS BUCKET NAME> \
--stagingBucketName=<GCS OR S3 BUCKET NAME> \
--runner=<DirectRunner/DataflowRunner> \
--project=<FOR DATAFLOW RUNNER: GCP PROJECT NAME> \
--gcpTempLocation=<FOR DATAFLOW RUNNER: GCS TEMP LOCATION STARTING> \
--region=<FOR DATAFLOW RUNNER: GCP REGION> \
--awsRegion=<OPTIONAL: AWS REGION IN CASE OF USING S3> \
--awsAccessKey=<OPTIONAL: AWS ACCESS KEY IN CASE OF USING S3>\
--awsSecretKey=<OPTIONAL: AWS SECRET KEY IN CASE OF USING S3>\
--appName=<OPTIONAL: DATAFLOW JOB NAME PREFIX>"
```
2. Go to Snowflake console to check saved counts:
```
select from <DATABASE NAME>.<SCHEMA NAME>.<TABLE NAME>;
```
![Batching snowflake result](./images/batching_snowflake_result.png)
3. Go to GCS bucket to check saved files:
3. Go to GCS or S3 bucket to check saved files:
![Batching gcs result](./images/batching_gcs_result.png)
4. Go to DataFlow to check submitted jobs:
![Batching DataFlow result](./images/batching_dataflow_result.png)
Expand All @@ -102,12 +114,17 @@ An example is streaming taxi rides from PubSub into Snowflake.
lat double
);
```
2. [Create Snowflake stage](https://docs.snowflake.com/en/sql-reference/sql/create-stage.html)
2. Depending on using GCS or S3 execute one of the following commands to [create Snowflake stage](https://docs.snowflake.com/en/sql-reference/sql/create-stage.html)
```
create or replace stage <STAGE NAME>
url = 'gcs://<GCS BUCKET NAME>/data/'
storage_integration = <INTEGRATION NAME>;
```
```
create stage <STAGE NAME>
url = 'S3://<S3 BUCKET NAME>/data/'
storage_integration = <INTEGRATION NAME>;
```
note: SnowflakeIO requires that url must have /data/ as a sufix
3. [Create Key/Pair](https://docs.snowflake.com/en/user-guide/snowsql-start.html#using-key-pair-authentication)
for authentication process.
Expand All @@ -133,10 +150,13 @@ for authentication process.
--schema=<SNOWFLAKE SCHEMA> \
--snowPipe=<SNOWFLAKE SNOWPIPE NAME> \
--storageIntegrationName=<SNOWFLAKE STORAGE INTEGRATION NAME> \
--stagingBucketName=<GCS BUCKET NAME> \
--stagingBucketName=<GCS OR S3 BUCKET NAME> \
--runner=<DirectRunner/DataflowRunner> \
--project=<FOR DATAFLOW RUNNER: GCP PROJECT NAME> \
--region=<FOR DATAFLOW RUNNER: GCP REGION> \
--awsRegion=<OPTIONAL: AWS REGION IN CASE OF USING S3> \
--awsAccessKey=<OPTIONAL: AWS ACCESS KEY IN CASE OF USING S3>\
--awsSecretKey=<OPTIONAL: AWS SECRET KEY IN CASE OF USING S3>\
--appName=<OPTIONAL: DATAFLOW JOB NAME PREFIX>"
```
2. Go to Snowflake console to check saved taxi rides:
Expand Down Expand Up @@ -166,7 +186,7 @@ list for currently supported runtime options.
--templateLocation=gs://<GCS BUCKET NAME>/templates/<TEMPLATE NAME>\
--region=<GCP REGION>\
--storageIntegrationName=<SNOWFLAKE STORAGE INTEGRATION NAME> \
--stagingBucketName=gs://<GCS BUCKET NAME>/ \
--stagingBucketName=<gs or s3>://<GCS OR S3 BUCKET NAME>/ \
--username=<SNOWFLAKE USERNAME>\
--database=<SNOWFLAKE DATABASE> \
--schema=<SNOWFLAKE SCHEMA> \
Expand Down Expand Up @@ -225,7 +245,7 @@ list for currently supported runtime options.
* --serverName= full server name with account, zone and domain.
* --username= required for username/password and Private Key authentication.
* --password= required for username/password authentication only
* --stagingBucketName= external bucket path ending with `/`. I.e. `gs://bucket/`. Sub-directories are allowed.
* --stagingBucketName= external bucket path ending with `/`. I.e. `<gs or s3>://bucket/`. Sub-directories are allowed.
* --rawPrivateKey= raw private key. Required for Private Key authentication only.
* --privateKeyPassphrase= private Key's passphrase. Required for Private Key authentication only.
* --storageIntegrationName= storage integration name
Expand Down Expand Up @@ -297,7 +317,7 @@ python -m pip install apachebeam_snowflake.whl
```
2. [Go to Flink console](http://localhost:8081/)
![Xlang Flink result](./images/xlang_flink_result.png)
3. Go to GCS bucket to check saved files:
3. Go to GCS or S3 bucket to check saved files:
![Xlang GCS result](./images/xlang_gcs_result.png)
4. Check console
![Xlang console result](./images/xlang_console_result.png)
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ repositories {
dependencies {
testCompile group: 'junit', name: 'junit', version: '4.12'
compile files('beam-sdks-java-io-snowflake-2.22.0-SNAPSHOT.jar')
compile 'org.apache.beam:beam-sdks-java-io-amazon-web-services:2.22.0'
compile group: 'org.apache.beam', name: 'beam-sdks-java-core', version: '2.22.0'
compile group: 'org.apache.beam', name: 'beam-runners-direct-java', version: '2.22.0'
compile group: 'org.apache.beam', name: 'beam-runners-google-cloud-dataflow-java', version: '2.22.0'
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/batching/SnowflakeWordCountOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.io.aws.options.S3Options;
import util.AwsOptions;

/**
* Supported PipelineOptions used in provided examples.
*/
public interface SnowflakeWordCountOptions extends SnowflakePipelineOptions {
public interface SnowflakeWordCountOptions extends SnowflakePipelineOptions, AwsOptions, S3Options {

@Description("Path of the file to read from")
@Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt")
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/batching/WordCountExample.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import util.AwsOptionsParser;

/**
* An example that contains batch writing and reading from Snowflake. Inspired by Apache Beam/WordCount-example(https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/WordCount.java)
*
* <p>
* Check main README for more information.
*/
public class WordCountExample {
Expand All @@ -35,11 +36,12 @@ public static void main(String[] args) {
SnowflakeWordCountOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(SnowflakeWordCountOptions.class);

AwsOptionsParser.format(options);

runWritingToSnowflake(options);
runReadingFromSnowflake(options);
}


private static void runWritingToSnowflake(SnowflakeWordCountOptions options) {
Pipeline p = Pipeline.create(options);

Expand Down
11 changes: 6 additions & 5 deletions src/main/java/streaming/TaxiRidesExample.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.snowflake.SnowflakeIO;
import org.apache.beam.sdk.io.snowflake.SnowflakePipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.ToString;
import org.joda.time.Duration;
import util.AwsOptionsParser;

import java.util.UUID;

Expand All @@ -23,8 +23,10 @@ public class TaxiRidesExample {
private static final String PUBSUB_TAX_RIDES = "projects/pubsub-public-data/topics/taxirides-realtime";

public static void main(String[] args) {
SnowflakePipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(SnowflakePipelineOptions.class);
TaxiRidesOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(TaxiRidesOptions.class);

AwsOptionsParser.format(options);

Pipeline p = Pipeline.create(options);

Expand All @@ -44,15 +46,14 @@ public static void main(String[] args) {
.withSnowPipe(options.getSnowPipe())
.withFileNameTemplate(UUID.randomUUID().toString())
.withFlushTimeLimit(Duration.millis(3000))
.withDebugMode(SnowflakeIO.StreamingLogLevel.INFO)
.withFlushRowLimit(100)
.withQuotationMark("")
.withShardsNumber(1));

p.run();
}

public static SnowflakeIO.DataSourceConfiguration createSnowflakeConfiguration(SnowflakePipelineOptions options) {
public static SnowflakeIO.DataSourceConfiguration createSnowflakeConfiguration(TaxiRidesOptions options) {
return SnowflakeIO.DataSourceConfiguration.create()
.withKeyPairRawAuth(options.getUsername(), options.getRawPrivateKey(), options.getPrivateKeyPassphrase())
.withKeyPairPathAuth(options.getUsername(), options.getPrivateKeyPath(), options.getPrivateKeyPassphrase())
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/streaming/TaxiRidesOptions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package streaming;

import org.apache.beam.sdk.io.aws.options.S3Options;
import org.apache.beam.sdk.io.snowflake.SnowflakePipelineOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.Validation.Required;
import util.AwsOptions;

public interface TaxiRidesOptions extends SnowflakePipelineOptions, AwsOptions, S3Options {
}
23 changes: 23 additions & 0 deletions src/main/java/util/AwsOptions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package util;

import org.apache.beam.sdk.io.aws.options.S3Options;
import org.apache.beam.sdk.io.snowflake.SnowflakePipelineOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.options.ValueProvider;

public interface AwsOptions extends SnowflakePipelineOptions, S3Options {

@Description("AWS Access Key")
@Default.String("access_key")
String getAwsAccessKey();

void setAwsAccessKey(String awsAccessKey);

@Description("AWS secret key")
@Default.String("secret_key")
String getAwsSecretKey();

void setAwsSecretKey(String awsSecretKey);
}
18 changes: 18 additions & 0 deletions src/main/java/util/AwsOptionsParser.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package util;

import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;

public class AwsOptionsParser {

private static final String AWS_S3_PREFIX = "s3";

public static void format(AwsOptions options) {
if (options.getStagingBucketName().get().toLowerCase().startsWith(AWS_S3_PREFIX)) {
options.setAwsCredentialsProvider(
new AWSStaticCredentialsProvider(
new BasicAWSCredentials(options.getAwsAccessKey(), options.getAwsSecretKey())));
}
}

}