private static void assertEqualCredentials(AWSCredentials expected, AWSCredentials actual) { assertEquals(expected.getAWSAccessKeyId(), actual.getAWSAccessKeyId()); assertEquals(expected.getAWSAccessKeyId(), actual.getAWSAccessKeyId()); if (expected instanceof AWSSessionCredentials) { assertTrue(actual instanceof AWSSessionCredentials); AWSSessionCredentials expectedSession = (AWSSessionCredentials)expected; AWSSessionCredentials actualSession = (AWSSessionCredentials)actual; assertEquals(expectedSession.getSessionToken(), actualSession.getSessionToken()); } else { assertFalse(actual instanceof AWSSessionCredentials); } }
/** * Tests two profiles having same name. The second profile overrides the first profile. Also * checks if the AWS Access Key ID and AWS Secret Access Key are mapped properly under the * profile. */ @Test public void testTwoProfileWithSameName() throws URISyntaxException { ProfilesConfigFile profile = new ProfilesConfigFile( ProfileResourceLoader.profilesWithSameProfileName().asFile()); AWSCredentials defaultCred = profile.getCredentials(DEFAULT_PROFILE_NAME); assertNotNull(defaultCred); assertTrue(defaultCred instanceof BasicAWSCredentials); AWSCredentials testCred = profile.getCredentials(PROFILE_NAME_TEST); assertNotNull(testCred); assertTrue(testCred instanceof AWSSessionCredentials); AWSSessionCredentials testSessionCred = (AWSSessionCredentials) testCred; assertEquals(testSessionCred.getAWSAccessKeyId(), "testProfile2"); assertEquals(testSessionCred.getAWSSecretKey(), "testProfile2"); assertEquals(testSessionCred.getSessionToken(), "testProfile2"); }
public void sign(SignableRequest<?> request, AWSCredentials credentials) throws SdkClientException { AWSCredentials sanitizedCredentials = sanitizeCredentials(credentials); if ( sanitizedCredentials instanceof AWSSessionCredentials ) { addSessionCredentials(request, (AWSSessionCredentials) sanitizedCredentials); } String expirationInSeconds = Long.toString(expiration.getTime() / 1000L); String canonicalString = RestUtils.makeS3CanonicalString( httpVerb, resourcePath, request, expirationInSeconds); String signature = super.signAndBase64Encode(canonicalString, sanitizedCredentials.getAWSSecretKey(), SigningAlgorithm.HmacSHA1); request.addParameter("AWSAccessKeyId", sanitizedCredentials.getAWSAccessKeyId()); request.addParameter("Expires", expirationInSeconds); request.addParameter("Signature", signature); }
/** By default, get credentials from the {@link DefaultAWSCredentialsProviderChain} */ @Bean @ConditionalOnMissingBean AWSCredentials.Provider credentials() { return new AWSCredentials.Provider() { AWSCredentialsProvider delegate = new DefaultAWSCredentialsProviderChain(); @Override public AWSCredentials get() { com.amazonaws.auth.AWSCredentials result = delegate.getCredentials(); String sessionToken = result instanceof AWSSessionCredentials ? ((AWSSessionCredentials) result).getSessionToken() : null; return new AWSCredentials( result.getAWSAccessKeyId(), result.getAWSSecretKey(), sessionToken ); } }; }
private String getCopyConfig(Map<String, Object> configInput, boolean maskConfig) throws IOException { TaskRequest taskRequest = testHelper.createTaskRequest(configInput, Optional.absent()); OperatorContext operatorContext = mock(OperatorContext.class); when(operatorContext.getProjectPath()).thenReturn(testHelper.projectPath()); when(operatorContext.getTaskRequest()).thenReturn(taskRequest); RedshiftLoadOperatorFactory.RedshiftLoadOperator operator = (RedshiftLoadOperatorFactory.RedshiftLoadOperator) operatorFactory.newOperator(operatorContext); assertThat(operator, is(instanceOf(RedshiftLoadOperatorFactory.RedshiftLoadOperator.class))); AWSSessionCredentials credentials = mock(AWSSessionCredentials.class); when(credentials.getAWSAccessKeyId()).thenReturn("my-access-key-id"); when(credentials.getAWSSecretKey()).thenReturn("my-secret-access-key"); RedshiftConnection.CopyConfig copyConfig = operator.createCopyConfig(testHelper.createConfig(configInput), credentials); Connection connection = mock(Connection.class); RedshiftConnection redshiftConnection = new RedshiftConnection(connection); return redshiftConnection.buildCopyStatement(copyConfig, maskConfig); }
private String getUnloadConfig(Map<String, Object> configInput, String queryId, boolean maskConfig) throws IOException { TaskRequest taskRequest = testHelper.createTaskRequest(configInput, Optional.absent()); OperatorContext operatorContext = mock(OperatorContext.class); when(operatorContext.getProjectPath()).thenReturn(testHelper.projectPath()); when(operatorContext.getTaskRequest()).thenReturn(taskRequest); RedshiftUnloadOperatorFactory.RedshiftUnloadOperator operator = (RedshiftUnloadOperatorFactory.RedshiftUnloadOperator) operatorFactory.newOperator(operatorContext); assertThat(operator, is(instanceOf(RedshiftUnloadOperatorFactory.RedshiftUnloadOperator.class))); AWSSessionCredentials credentials = mock(AWSSessionCredentials.class); when(credentials.getAWSAccessKeyId()).thenReturn("my-access-key-id"); when(credentials.getAWSSecretKey()).thenReturn("my-secret-access-key"); RedshiftConnection.UnloadConfig unloadConfig = operator.createUnloadConfig(testHelper.createConfig(configInput), credentials, queryId); Connection connection = mock(Connection.class); RedshiftConnection redshiftConnection = new RedshiftConnection(connection); return redshiftConnection.buildUnloadStatement(unloadConfig, maskConfig); }
@Test public void serviceWithSessionToken() throws Exception { String accessKey = "accessKey"; String secretKey = "secretKey"; String sessionToken = "sessionToken"; new MockUnit(Config.class) .expect(unit -> { Config config = unit.get(Config.class); expect(config.hasPath("aws.s3.accessKey")).andReturn(false); expect(config.hasPath("aws.s3.secretKey")).andReturn(false); expect(config.hasPath("aws.s3.sessionToken")).andReturn(false); expect(config.hasPath("aws.sessionToken")).andReturn(true); expect(config.getString("aws.accessKey")).andReturn(accessKey); expect(config.getString("aws.secretKey")).andReturn(secretKey); expect(config.getString("aws.sessionToken")).andReturn(sessionToken); }) .run(unit -> { AWSSessionCredentials creds = (AWSSessionCredentials) new ConfigCredentialsProvider( unit.get(Config.class)) .service("s3").getCredentials(); assertEquals("accessKey", creds.getAWSAccessKeyId()); assertEquals("secretKey", creds.getAWSSecretKey()); assertEquals("sessionToken", creds.getSessionToken()); }); }
@Test public void serviceWithCustomSessionToken() throws Exception { String accessKey = "accessKey"; String secretKey = "secretKey"; String sessionToken = "sessionToken"; new MockUnit(Config.class) .expect(unit -> { Config config = unit.get(Config.class); expect(config.hasPath("aws.s3.accessKey")).andReturn(false); expect(config.hasPath("aws.s3.secretKey")).andReturn(false); expect(config.hasPath("aws.s3.sessionToken")).andReturn(true); expect(config.getString("aws.accessKey")).andReturn(accessKey); expect(config.getString("aws.secretKey")).andReturn(secretKey); expect(config.getString("aws.s3.sessionToken")).andReturn(sessionToken); }) .run(unit -> { AWSSessionCredentials creds = (AWSSessionCredentials) new ConfigCredentialsProvider( unit.get(Config.class)) .service("s3").getCredentials(); assertEquals("accessKey", creds.getAWSAccessKeyId()); assertEquals("secretKey", creds.getAWSSecretKey()); assertEquals("sessionToken", creds.getSessionToken()); }); }
public Profile(String profileName, AWSCredentials awsCredentials) { Map<String, String> properties = new LinkedHashMap<String, String>(); properties.put(ProfileKeyConstants.AWS_ACCESS_KEY_ID, awsCredentials.getAWSAccessKeyId()); properties.put(ProfileKeyConstants.AWS_SECRET_ACCESS_KEY, awsCredentials.getAWSSecretKey()); if (awsCredentials instanceof AWSSessionCredentials) { AWSSessionCredentials sessionCred = (AWSSessionCredentials)awsCredentials; properties.put(ProfileKeyConstants.AWS_SESSION_TOKEN, sessionCred.getSessionToken()); } this.profileName = profileName; this.properties = properties; this.awsCredentials = new StaticCredentialsProvider(awsCredentials); }
public Map<String, String> getSignedHeaders(String uri, String method, Map<String, String> queryParams, Map<String, String> headers, Optional<byte[]> payload) { final LocalDateTime now = clock.get(); final AWSCredentials credentials = credentialsProvider.getCredentials(); final Map<String, String> result = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); result.putAll(headers); if (!result.containsKey(DATE)) { result.put(X_AMZ_DATE, now.format(BASIC_TIME_FORMAT)); } if (AWSSessionCredentials.class.isAssignableFrom(credentials.getClass())) { result.put(SESSION_TOKEN, ((AWSSessionCredentials) credentials).getSessionToken()); } final StringBuilder headersString = new StringBuilder(); final ImmutableList.Builder<String> signedHeaders = ImmutableList.builder(); for (Map.Entry<String, String> entry : result.entrySet()) { headersString.append(headerAsString(entry)).append(RETURN); signedHeaders.add(entry.getKey().toLowerCase()); } final String signedHeaderKeys = JOINER.join(signedHeaders.build()); final String canonicalRequest = method + RETURN + uri + RETURN + queryParamsString(queryParams) + RETURN + headersString.toString() + RETURN + signedHeaderKeys + RETURN + toBase16(hash(payload.orElse(EMPTY.getBytes(Charsets.UTF_8)))); final String stringToSign = createStringToSign(canonicalRequest, now); final String signature = sign(stringToSign, now, credentials); final String autorizationHeader = AWS4_HMAC_SHA256_CREDENTIAL + credentials.getAWSAccessKeyId() + SLASH + getCredentialScope(now) + SIGNED_HEADERS + signedHeaderKeys + SIGNATURE + signature; result.put(AUTHORIZATION, autorizationHeader); return ImmutableMap.copyOf(result); }
@Override public AWSSessionCredentials getCredentials() { if (this.credentials == null || this.lastRefreshedInstant.isBefore(Instant.now().minus(CREDENTIALS_DURATION))) { refresh(); } return this.credentials; }
@Override public synchronized void refresh() { final GetJobDetailsRequest getJobDetailsRequest = new GetJobDetailsRequest().withJobId(jobId); final GetJobDetailsResult getJobDetailsResult = codePipelineClient.getJobDetails(getJobDetailsRequest); final com.amazonaws.services.codepipeline.model.AWSSessionCredentials credentials = getJobDetailsResult.getJobDetails().getData().getArtifactCredentials(); this.lastRefreshedInstant = Instant.now(); this.credentials = new BasicSessionCredentials( credentials.getAccessKeyId(), credentials.getSecretAccessKey(), credentials.getSessionToken()); }
private AWSSessionCredentials createSessionCredentials(Config config, SecretProvider secrets, BasicAWSCredentials baseCredential) { List<AcceptableUri> acceptableUris = buildAcceptableUriForSessionCredentials(config, baseCredential); if (!config.get("temp_credentials", Boolean.class, true)) { return new BasicSessionCredentials( baseCredential.getAWSAccessKeyId(), baseCredential.getAWSSecretKey(), null ); } AWSSessionCredentialsFactory sessionCredentialsFactory = new AWSSessionCredentialsFactory(baseCredential, acceptableUris); Optional<String> roleArn = getSecretOptionalValue(secrets, "role_arn"); if (roleArn.isPresent()) { sessionCredentialsFactory.withRoleArn(roleArn.get()); Optional<String> roleSessionName = secrets.getSecretOptional("role_session_name"); if (roleSessionName.isPresent()) { sessionCredentialsFactory.withRoleSessionName(roleSessionName.get()); } } Optional<Integer> durationSeconds = config.getOptional("session_duration", Integer.class); if (durationSeconds.isPresent()) { sessionCredentialsFactory.withDurationSeconds(durationSeconds.get()); } return sessionCredentialsFactory.get(); }
@Override protected TaskResult run(Config params, Config state, RedshiftConnectionConfig connectionConfig) { UUID queryId; // generate query id if (!state.has(QUERY_ID)) { // this is the first execution of this task logger.debug("Generating query id for a new {} task", type()); queryId = UUID.randomUUID(); state.set(QUERY_ID, queryId); throw TaskExecutionException.ofNextPolling(0, ConfigElement.copyOf(state)); } queryId = state.get(QUERY_ID, UUID.class); BasicAWSCredentials baseCredentials = createBaseCredential(context.getSecrets()); AWSSessionCredentials sessionCredentials = createSessionCredentials(params, context.getSecrets(), baseCredentials); T statementConfig = createStatementConfig(params, sessionCredentials, queryId.toString()); beforeConnect(baseCredentials, statementConfig); pollingRetryExecutor(TaskState.of(state), "load") .retryIf(LockConflictException.class, x -> true) .withErrorMessage("Redshift Load/Unload operation failed") .runAction(s -> executeTask(params, connectionConfig, statementConfig, queryId)); return TaskResult.defaultBuilder(request).build(); }
@VisibleForTesting RedshiftConnection.UnloadConfig createUnloadConfig(Config config, AWSSessionCredentials sessionCredentials, String queryId) { RedshiftConnection.UnloadConfig uc = new RedshiftConnection.UnloadConfig(); uc.configure( unloadConfig -> { unloadConfig.accessKeyId = sessionCredentials.getAWSAccessKeyId(); unloadConfig.secretAccessKey = sessionCredentials.getAWSSecretKey(); if (sessionCredentials.getSessionToken() != null) { unloadConfig.sessionToken = Optional.of(sessionCredentials.getSessionToken()); } unloadConfig.query = config.get("query", String.class); unloadConfig.to = config.get("to", String.class); unloadConfig.manifest = config.getOptional("manifest", Boolean.class); unloadConfig.encrypted = config.getOptional("encrypted", Boolean.class); unloadConfig.allowoverwrite = config.getOptional("allowoverwrite", Boolean.class); unloadConfig.delimiter = config.getOptional("delimiter", String.class); unloadConfig.fixedwidth = config.getOptional("fixedwidth", String.class); unloadConfig.gzip = config.getOptional("gzip", Boolean.class); unloadConfig.bzip2 = config.getOptional("bzip2", Boolean.class); unloadConfig.nullAs = config.getOptional("null_as", String.class); unloadConfig.escape = config.getOptional("escape", Boolean.class); unloadConfig.addquotes = config.getOptional("addquotes", Boolean.class); unloadConfig.parallel = config.getOptional("parallel", String.class); unloadConfig.setupWithPrefixDir(queryId); } ); return uc; }
@Override public MultiEnvironment bind(@Nonnull Run<?, ?> build, FilePath workspace, Launcher launcher, TaskListener listener) throws IOException, InterruptedException { AWSCredentials credentials = getCredentials(build).getCredentials(); Map<String,String> m = new HashMap<String,String>(); m.put(accessKeyVariable, credentials.getAWSAccessKeyId()); m.put(secretKeyVariable, credentials.getAWSSecretKey()); // If role has been assumed, STS requires AWS_SESSION_TOKEN variable set too. if(credentials instanceof AWSSessionCredentials) { m.put(SESSION_TOKEN_VARIABLE_NAME, ((AWSSessionCredentials) credentials).getSessionToken()); } return new MultiEnvironment(m); }
private void calculateAndAddSignatureInternal(HttpHeaders httpHeaders, Operation operation, String objectName, String contentMd5, String contentType, String virtualHost) { String dateString = RFC_822_DATE_FORMAT.format(System.currentTimeMillis()); stringToSignBuilder .append(operation.getHttpMethod()) .append('\n') .append(contentMd5) .append('\n') .append(contentType) .append('\n') .append(dateString) .append('\n'); AWSCredentials credentials = credentialsProvider.getCredentials(); if (credentials instanceof AWSSessionCredentials) { String sessionToken = ((AWSSessionCredentials) credentials).getSessionToken(); httpHeaders.set(HEADER_TOKEN, sessionToken); stringToSignBuilder .append(HEADER_TOKEN) .append(':') .append(sessionToken) .append('\n'); } stringToSignBuilder.append('/'); stringToSignBuilder.append(virtualHost); operation.getResourceName(stringToSignBuilder, objectName); KeyParameter keyParameter = new KeyParameter(credentials.getAWSSecretKey().getBytes()); String authorization = calculateRFC2104HMAC(stringToSignBuilder.toString(), keyParameter); stringToSignBuilder.clear(); stringToSignBuilder.append("AWS ").append(credentials.getAWSAccessKeyId()).append(':').append(authorization); httpHeaders.set(HEADER_AUTHORIZATION, stringToSignBuilder.toString()); httpHeaders.set(HEADER_DATE, dateString); }
@Override protected void addSessionCredentials(SignableRequest<?> request, AWSSessionCredentials credentials) { // TODO Auto-generated method stub }
@Override protected void addSessionCredentials(SignableRequest<?> request, AWSSessionCredentials credentials) { request.addParameter("x-amz-security-token", credentials.getSessionToken()); }
@Override public void sign(SignableRequest<?> request, AWSCredentials credentials) { if (resourcePath == null) { throw new UnsupportedOperationException( "Cannot sign a request using a dummy S3Signer instance with " + "no resource path"); } if (credentials == null || credentials.getAWSSecretKey() == null) { log.debug("Canonical string will not be signed, as no AWS Secret Key was provided"); return; } AWSCredentials sanitizedCredentials = sanitizeCredentials(credentials); if (sanitizedCredentials instanceof AWSSessionCredentials) { addSessionCredentials(request, (AWSSessionCredentials) sanitizedCredentials); } /* * In s3 sigv2, the way slash characters are encoded should be * consistent in both the request url and the encoded resource path. * Since we have to encode "//" to "/%2F" in the request url to make * httpclient works, we need to do the same encoding here for the * resource path. */ String encodedResourcePath = SdkHttpUtils.appendUri( request.getEndpoint().getPath(), SdkHttpUtils.urlEncode(resourcePath, true), true); int timeOffset = request.getTimeOffset(); Date date = getSignatureDate(timeOffset); request.addHeader(Headers.DATE, ServiceUtils.formatRfc822Date(date)); String canonicalString = RestUtils.makeS3CanonicalString(httpVerb, encodedResourcePath, request, null, additionalQueryParamsToSign); log.debug("Calculated string to sign:\n\"" + canonicalString + "\""); String signature = super.signAndBase64Encode(canonicalString, sanitizedCredentials.getAWSSecretKey(), SigningAlgorithm.HmacSHA1); request.addHeader("Authorization", "AWS " + sanitizedCredentials.getAWSAccessKeyId() + ":" + signature); }
@Override protected void addSessionCredentials(SignableRequest<?> request, AWSSessionCredentials credentials) { request.addHeader("x-amz-security-token", credentials.getSessionToken()); }
public Map<String, Object> getSignedHeaders(String uri, String method, Multimap<String, String> queryParams, Map<String, Object> headers, Optional<byte[]> payload) { final LocalDateTime now = clock.get(); final AWSCredentials credentials = credentialsProvider.getCredentials(); final Map<String, Object> result = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); result.putAll(headers); final Optional<String> possibleHost = Optional.fromNullable(result.get(HOST)) .transform(Object::toString); final int indexOfPortSymbol = possibleHost.transform(host -> host.indexOf(':')).or(-1); if (indexOfPortSymbol > -1) { result.put(HOST, possibleHost.get().substring(0, indexOfPortSymbol)); } if (!result.containsKey(DATE)) { result.put(X_AMZ_DATE, now.format(BASIC_TIME_FORMAT)); } if (AWSSessionCredentials.class.isAssignableFrom(credentials.getClass())) { result.put(SESSION_TOKEN, ((AWSSessionCredentials) credentials).getSessionToken()); } final StringBuilder headersString = new StringBuilder(); final ImmutableList.Builder<String> signedHeaders = ImmutableList.builder(); for (Map.Entry<String, Object> entry : result.entrySet()) { final Optional<String> headerAsString = headerAsString(entry, method); if (headerAsString.isPresent()) { headersString.append(headerAsString.get()).append(RETURN); signedHeaders.add(entry.getKey().toLowerCase()); } } final String signedHeaderKeys = JOINER.join(signedHeaders.build()); final String canonicalRequest = method + RETURN + SdkHttpUtils.urlEncode(uri, true) + RETURN + queryParamsString(queryParams) + RETURN + headersString.toString() + RETURN + signedHeaderKeys + RETURN + toBase16(hash(payload.or(EMPTY.getBytes(Charsets.UTF_8)))); final String stringToSign = createStringToSign(canonicalRequest, now); final String signature = sign(stringToSign, now, credentials); final String autorizationHeader = AWS4_HMAC_SHA256_CREDENTIAL + credentials.getAWSAccessKeyId() + SLASH + getCredentialScope(now) + SIGNED_HEADERS + signedHeaderKeys + SIGNATURE + signature; result.put(AUTHORIZATION, autorizationHeader); return ImmutableMap.copyOf(result); }
@Override protected RedshiftConnection.UnloadConfig createStatementConfig(Config params, AWSSessionCredentials sessionCredentials, String queryId) { return createUnloadConfig(params, sessionCredentials, queryId); }
@VisibleForTesting RedshiftConnection.CopyConfig createCopyConfig(Config config, AWSSessionCredentials sessionCredentials) { RedshiftConnection.CopyConfig cc = new RedshiftConnection.CopyConfig(); cc.configure( copyConfig -> { copyConfig.accessKeyId = sessionCredentials.getAWSAccessKeyId(); copyConfig.secretAccessKey = sessionCredentials.getAWSSecretKey(); if (sessionCredentials.getSessionToken() != null) { copyConfig.sessionToken = Optional.of(sessionCredentials.getSessionToken()); } copyConfig.table = config.get("table", String.class); copyConfig.columnList = config.getOptional("column_list", String.class); copyConfig.from = config.get("from", String.class); copyConfig.readratio = config.getOptional("readratio", Integer.class); copyConfig.manifest = config.getOptional("manifest", Boolean.class); copyConfig.encrypted = config.getOptional("encrypted", Boolean.class); copyConfig.region = config.getOptional("region", String.class); copyConfig.csv = config.getOptional("csv", String.class); copyConfig.delimiter = config.getOptional("delimiter", String.class); copyConfig.fixedwidth = config.getOptional("fixedwidth", String.class); copyConfig.json = config.getOptional("json", String.class); copyConfig.avro = config.getOptional("avro", String.class); copyConfig.gzip = config.getOptional("gzip", Boolean.class); copyConfig.bzip2 = config.getOptional("bzip2", Boolean.class); copyConfig.lzop = config.getOptional("lzop", Boolean.class); copyConfig.acceptanydate = config.getOptional("acceptanydate", Boolean.class); copyConfig.acceptinvchars = config.getOptional("acceptinvchars", String.class); copyConfig.blanksasnull = config.getOptional("blanksasnull", Boolean.class); copyConfig.dateformat = config.getOptional("dateformat", String.class); copyConfig.emptyasnull = config.getOptional("emptyasnull", Boolean.class); copyConfig.encoding = config.getOptional("encoding", String.class); copyConfig.escape = config.getOptional("escape", Boolean.class); copyConfig.explicitIds = config.getOptional("explicit_ids", Boolean.class); copyConfig.fillrecord = config.getOptional("fillrecord", Boolean.class); copyConfig.ignoreblanklines = config.getOptional("ignoreblanklines", Boolean.class); copyConfig.ignoreheader = config.getOptional("ignoreheader", Integer.class); copyConfig.nullAs = config.getOptional("null_as", String.class); copyConfig.removequotes = config.getOptional("removequotes", Boolean.class); copyConfig.roundec = config.getOptional("roundec", Boolean.class); copyConfig.timeformat = config.getOptional("timeformat", String.class); copyConfig.trimblanks = config.getOptional("trimblanks", Boolean.class); copyConfig.truncatecolumns = config.getOptional("truncatecolumns", Boolean.class); copyConfig.comprows = config.getOptional("comprows", Integer.class); copyConfig.compupdate = config.getOptional("compupdate", String.class); copyConfig.maxerror = config.getOptional("maxerror", Integer.class); copyConfig.noload = config.getOptional("noload", Boolean.class); copyConfig.statupdate = config.getOptional("statupdate", String.class); } ); return cc; }
@Override protected RedshiftConnection.CopyConfig createStatementConfig(Config params, AWSSessionCredentials sessionCredentials, String queryId) { return createCopyConfig(params, sessionCredentials); }
@Override protected void addSessionCredentials(Request<?> arg0, AWSSessionCredentials arg1) { }
public static void run(String jobInputParam) throws Exception{ List<StructField> schemaFields = new ArrayList<StructField>(); schemaFields.add(DataTypes.createStructField("vendor_id", DataTypes.StringType, true)); schemaFields.add(DataTypes.createStructField("trans_amount", DataTypes.StringType, true)); schemaFields.add(DataTypes.createStructField("trans_type", DataTypes.StringType, true)); schemaFields.add(DataTypes.createStructField("item_id", DataTypes.StringType, true)); schemaFields.add(DataTypes.createStructField("trans_date", DataTypes.StringType, true)); StructType schema = DataTypes.createStructType(schemaFields); SparkConf conf = new SparkConf().setAppName("Spark Redshift No Access-Keys"); SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); String redshiftJDBCURL=props.getProperty("redshift.jdbc.url"); String s3TempPath = props.getProperty("s3.temp.path"); System.out.println("props"+props); JavaRDD<Row> salesRDD = sc.textFile(jobInputParam). map(new Function<String,Row>(){public Row call(String saleRec){ String[] fields = saleRec.split(","); return RowFactory.create(fields[0], fields[1],fields[2],fields[3],fields[4]);}}); Dataset<Row> salesDF = spark.createDataFrame(salesRDD,schema); Dataset<Row> vendorItemSaleAmountDF = salesDF.filter(salesDF.col("trans_type").equalTo("4")).groupBy(salesDF.col("vendor_id"),salesDF.col("item_id"),salesDF.col("trans_date")).agg(ImmutableMap.of("trans_amount", "sum")); Dataset<Row> vendorItemTaxAmountDF = salesDF.filter(salesDF.col("trans_type").equalTo("5")).groupBy(salesDF.col("vendor_id"),salesDF.col("item_id"),salesDF.col("trans_date")).agg(ImmutableMap.of("trans_amount", "sum")); Dataset<Row> vendorItemDiscountAmountDF = salesDF.filter(salesDF.col("trans_type").equalTo("6")).groupBy(salesDF.col("vendor_id"),salesDF.col("item_id"),salesDF.col("trans_date")).agg(ImmutableMap.of("trans_amount", "sum")); String[] joinColArray = {"vendor_id","item_id","trans_date"}; vendorItemSaleAmountDF.printSchema(); Seq<String> commonJoinColumns = scala.collection.JavaConversions.asScalaBuffer(Arrays.asList(joinColArray)).seq(); Dataset<Row> vendorAggregatedDF = vendorItemSaleAmountDF.join(vendorItemTaxAmountDF,commonJoinColumns,"left_outer") .join(vendorItemDiscountAmountDF,commonJoinColumns,"left_outer") .toDF("vendor_id","item_id","trans_date","sale_amount","tax_amount","discount_amount"); vendorAggregatedDF.printSchema(); DefaultAWSCredentialsProviderChain provider = new DefaultAWSCredentialsProviderChain(); AWSSessionCredentials creds = (AWSSessionCredentials) provider.getCredentials(); String appendix=new StringBuilder(String.valueOf(System.currentTimeMillis())).append("_").append(String.valueOf(new Random().nextInt(10)+1)).toString(); String vendorTransSummarySQL = new StringBuilder("begin transaction;delete from vendortranssummary using vendortranssummary_temp") .append(appendix) .append(" where vendortranssummary.vendor_id=vendortranssummary_temp") .append(appendix) .append(".vendor_id and vendortranssummary.item_id=vendortranssummary_temp") .append(appendix) .append(".item_id and vendortranssummary.trans_date = vendortranssummary_temp") .append(appendix) .append(".trans_date;") .append("insert into vendortranssummary select * from vendortranssummary_temp") .append(appendix) .append(";drop table vendortranssummary_temp") .append(appendix) .append(";end transaction;").toString(); vendorAggregatedDF.write().format("com.databricks.spark.redshift").option("url", redshiftJDBCURL) .option("dbtable", "vendortranssummary_temp"+appendix) .option("usestagingtable","false") .option("postactions",vendorTransSummarySQL) .option("temporary_aws_access_key_id", creds.getAWSAccessKeyId()) .option("temporary_aws_secret_access_key",creds.getAWSSecretKey()) .option("temporary_aws_session_token", creds.getSessionToken()) .option("tempdir", s3TempPath).mode(SaveMode.Overwrite).save(); }
protected abstract T createStatementConfig(Config params, AWSSessionCredentials sessionCredentials, String queryId);