diff --git a/.github/dependabot.yml b/.github/dependabot.yml index 089f7736..7429f0b4 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -6,3 +6,4 @@ updates: directory: "/" schedule: interval: "daily" + open-pull-requests-limit: 10 diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index 38815042..5ea444d1 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -16,12 +16,12 @@ jobs: java: ["8", "11", "17"] steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Set up JDK uses: actions/setup-java@v2 with: java-version: ${{ matrix.java }} - distribution: 'temurin' + distribution: temurin cache: maven - name: Build with Maven run: mvn --batch-mode --file pom.xml package diff --git a/pom.xml b/pom.xml index d1e87d96..81d39029 100644 --- a/pom.xml +++ b/pom.xml @@ -97,14 +97,14 @@ com.google.guava guava - 32.1.1-jre + 32.1.2-jre org.junit.jupiter junit-jupiter - 5.9.3 + 5.10.0 test diff --git a/src/main/java/com/wavefront/sdk/common/Utils.java b/src/main/java/com/wavefront/sdk/common/Utils.java index 72abd905..af90acc3 100644 --- a/src/main/java/com/wavefront/sdk/common/Utils.java +++ b/src/main/java/com/wavefront/sdk/common/Utils.java @@ -832,4 +832,8 @@ public static double convertSemVerToGauge(String version) { } return 0.0D; } + + public static boolean isNullOrEmpty(final String string) { + return string == null || string.isEmpty(); + } } diff --git a/src/main/java/com/wavefront/sdk/common/clients/WavefrontClient.java b/src/main/java/com/wavefront/sdk/common/clients/WavefrontClient.java index 9e7afd29..f8433eb6 100644 --- a/src/main/java/com/wavefront/sdk/common/clients/WavefrontClient.java +++ b/src/main/java/com/wavefront/sdk/common/clients/WavefrontClient.java @@ -12,6 +12,10 @@ import com.wavefront.sdk.common.annotation.NonNull; import com.wavefront.sdk.common.annotation.Nullable; import com.wavefront.sdk.common.clients.service.ReportingService; +import com.wavefront.sdk.common.clients.service.token.CSPServerToServerTokenService; +import com.wavefront.sdk.common.clients.service.token.NoopTokenService; +import com.wavefront.sdk.common.clients.service.token.TokenService; +import com.wavefront.sdk.common.clients.service.token.WavefrontTokenService; import com.wavefront.sdk.common.logging.MessageDedupingLogger; import com.wavefront.sdk.common.metrics.WavefrontSdkDeltaCounter; import com.wavefront.sdk.common.metrics.WavefrontSdkMetricsRegistry; @@ -137,10 +141,17 @@ public class WavefrontClient implements WavefrontSender, Runnable { // Flag to prevent sending after close() has been called private final AtomicBoolean closed = new AtomicBoolean(false); + private final TokenService tokenService; + public static class Builder { + private static final String CSP_DEFAULT_BASE_URL = "https://console.cloud.vmware.com/"; // Required parameters private final String server; private final String token; + private final String cspClientId; + private final String cspClientSecret; + + private String cspBaseUrl; // Optional parameters private int metricsPort = -1; @@ -152,7 +163,7 @@ public static class Builder { private TimeUnit flushIntervalTimeUnit = TimeUnit.SECONDS; private int messageSizeBytes = Integer.MAX_VALUE; private boolean includeSdkMetrics = true; - private Map tags = Maps.newHashMap(); + private final Map tags = Maps.newHashMap(); private URI metricsUri; private URI tracesUri; @@ -165,17 +176,46 @@ public static class Builder { * @param token A valid API token with direct ingestion permissions */ public Builder(String server, @Nullable String token) { + this(server, token, null, null, null); + } + + /** + * Create a new WavefrontClient.Builder + * + * @param server A server URL of the form "https://clusterName.wavefront.com" or + * "http://internal.proxy.com:port" + * @param token A valid API token with direct ingestion permissions + * @param cspBaseUrl A server URL that points to the CSP service + * @param cspClientId Client ID for CSP + * @param cspClientSecret Client Secret for CSP + */ + private Builder(String server, @Nullable String token, @Nullable String cspBaseUrl, @Nullable String cspClientId, @Nullable String cspClientSecret) { this.server = server; this.token = token; + this.cspBaseUrl = cspBaseUrl; + this.cspClientId = cspClientId; + this.cspClientSecret = cspClientSecret; + } + + /** + * Create a new WavefrontClient.Builder + * + * @param server A server URL of the form "https://clusterName.wavefront.com" or + * "http://internal.proxy.com:port" + * @param cspClientId Client ID for CSP + * @param cspClientSecret Client Secret for CSP + */ + public Builder(String server, @Nullable String cspClientId, @Nullable String cspClientSecret) { + this(server, null, CSP_DEFAULT_BASE_URL, cspClientId, cspClientSecret); } /** * Create a new WavefrontClient.Builder * - * @param proxyServer A server URL of the the form "http://internal.proxy.com:port" + * @param proxyServer A server URL of the form "http://internal.proxy.com:port" */ public Builder(String proxyServer) { - this(proxyServer, null); + this(proxyServer, null, null, null, null); } /** @@ -189,6 +229,11 @@ public Builder maxQueueSize(int maxQueueSize) { return this; } + public Builder cspBaseUrl(final String cspBaseUrl) { + this.cspBaseUrl = cspBaseUrl; + return this; + } + /** * Set batch size to be reported during every flush. * @@ -274,6 +319,7 @@ public Builder includeSdkMetrics(boolean includeSdkMetrics) { /** * Set the tags to apply to the internal SDK metrics + * * @param tags a map of point tags to apply to the internal sdk metrics * @return {@code this} */ @@ -332,7 +378,7 @@ public WavefrontClient build() { this.metricsUri = buildUri(this.server, this.metricsPort); this.tracesUri = buildUri(this.server, this.tracesPort); } catch (URISyntaxException e) { - throw new IllegalStateException(e); + throw new IllegalStateException(e); } return new WavefrontClient(this); @@ -345,7 +391,7 @@ private URI buildUri(String server, int port) throws URISyntaxException { } return new URI(uri.getScheme(), null, uri.getHost(), port, uri.getPath(), uri.getQuery(), - uri.getFragment()); + uri.getFragment()); } } @@ -359,6 +405,26 @@ private WavefrontClient(Builder builder) { } defaultSource = tempSource; + if (!Utils.isNullOrEmpty(builder.token)) { + tokenService = new WavefrontTokenService(builder.token); + } else if (!Utils.isNullOrEmpty(builder.cspBaseUrl) && !Utils.isNullOrEmpty(builder.cspClientId) && !Utils.isNullOrEmpty(builder.cspClientSecret)) { + tokenService = new CSPServerToServerTokenService(builder.cspBaseUrl, builder.cspClientId, builder.cspClientSecret); + } else { + tokenService = new NoopTokenService(); + } + + switch (tokenService.getClass().getSimpleName()) { + case "CSPServerToServerTokenService": + logger.log(Level.INFO, "The Wavefront SDK will use CSP authentication when communicating with the Wavefront Backend for Direct Ingestion."); + break; + case "WavefrontTokenService": + logger.log(Level.INFO, "The Wavefront SDK will use an API TOKEN when communicating with the Wavefront Backend for Direct Ingestion."); + break; + case "NoopTokenService": + logger.log(Level.INFO, "The Wavefront SDK will communicate with a Wavefront Proxy."); + break; + } + batchSize = builder.batchSize; messageSizeBytes = builder.messageSizeBytes; metricsBuffer = new LinkedBlockingQueue<>(builder.maxQueueSize); @@ -367,8 +433,8 @@ private WavefrontClient(Builder builder) { spanLogsBuffer = new LinkedBlockingQueue<>(builder.maxQueueSize); eventsBuffer = new LinkedBlockingQueue<>(builder.maxQueueSize); logsBuffer = new LinkedBlockingQueue<>(builder.maxQueueSize); - metricsReportingService = new ReportingService(builder.metricsUri, builder.token, builder.reportingServiceLogSuppressTimeSeconds); - tracesReportingService = new ReportingService(builder.tracesUri, builder.token, builder.reportingServiceLogSuppressTimeSeconds); + metricsReportingService = new ReportingService(builder.metricsUri, tokenService, builder.reportingServiceLogSuppressTimeSeconds); + tracesReportingService = new ReportingService(builder.tracesUri, tokenService, builder.reportingServiceLogSuppressTimeSeconds); scheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory("wavefrontClientSender").setDaemon(true)); scheduler.scheduleAtFixedRate(this, 1, builder.flushInterval, builder.flushIntervalTimeUnit); @@ -440,13 +506,21 @@ private WavefrontClient(Builder builder) { this.clientId = builder.server; } - /** {@inheritDoc} */ + /** + * {@inheritDoc} + */ @Override public String getClientId() { return clientId; } - /** {@inheritDoc} */ + public TokenService getTokenService() { + return tokenService; + } + + /** + * {@inheritDoc} + */ @Override public void sendMetric(String name, double value, @Nullable Long timestamp, @Nullable String source, @Nullable Map tags) @@ -472,7 +546,9 @@ public void sendMetric(String name, double value, @Nullable Long timestamp, } } - /** {@inheritDoc} */ + /** + * {@inheritDoc} + */ @Override public void sendFormattedMetric(String point) throws IOException { if (closed.get()) { @@ -494,7 +570,9 @@ public void sendFormattedMetric(String point) throws IOException { } } - /** {@inheritDoc} */ + /** + * {@inheritDoc} + */ @Override public void sendDistribution(String name, List> centroids, Set histogramGranularities, @@ -524,10 +602,12 @@ public void sendDistribution(String name, List> centroids, } } - /** {@inheritDoc} */ + /** + * {@inheritDoc} + */ @Override public void sendLog(String name, double value, Long timestamp, String source, - Map tags) throws IOException { + Map tags) throws IOException { if (closed.get()) { throw new IOException("attempt to send using closed sender"); } @@ -550,7 +630,9 @@ public void sendLog(String name, double value, Long timestamp, String source, } } - /** {@inheritDoc} */ + /** + * {@inheritDoc} + */ @Override public void sendEvent(String name, long startMillis, long endMillis, @Nullable String source, @Nullable Map tags, @@ -584,7 +666,9 @@ public void sendEvent(String name, long startMillis, long endMillis, @Nullable S } } - /** {@inheritDoc} */ + /** + * {@inheritDoc} + */ @Override public void sendSpan(String name, long startMillis, long durationMillis, @Nullable String source, UUID traceId, UUID spanId, @@ -611,7 +695,7 @@ public void sendSpan(String name, long startMillis, long durationMillis, String spanSecondaryId = null; if (tags != null) { spanSecondaryId = tags.stream().filter(pair -> pair._1.equals(SPAN_SECONDARY_ID_KEY)) - .map(pair -> pair._2).findFirst().orElse(null); + .map(pair -> pair._2).findFirst().orElse(null); } sendSpanLogs(traceId, spanId, spanLogs, span, spanSecondaryId); } @@ -647,7 +731,9 @@ private void sendSpanLogs( } } - /** {@inheritDoc} */ + /** + * {@inheritDoc} + */ @Override public void run() { try { @@ -658,7 +744,9 @@ public void run() { } } - /** {@inheritDoc} */ + /** + * {@inheritDoc} + */ @Override public void flush() throws IOException { if (closed.get()) { @@ -702,6 +790,14 @@ private void internalFlush(LinkedBlockingQueue buffer, String format, LogMessageType permissionsMessageType, LogMessageType bufferFullMessageType) throws IOException { + + String tokenIdentifier = ""; + if (tokenService.getClass().equals(CSPServerToServerTokenService.class)) { + tokenIdentifier = "CSP ACCESS TOKEN"; + } else if (tokenService.getClass().equals(WavefrontTokenService.class)) { + tokenIdentifier = "API TOKEN"; + } + ReportingService entityReportingService; switch (format) { case Constants.WAVEFRONT_SPAN_LOG_FORMAT: @@ -727,7 +823,7 @@ private void internalFlush(LinkedBlockingQueue buffer, String format, switch (featureDisabledReason) { case 401: logger.log(permissionsMessageType.toString(), Level.SEVERE, - "Please verify that your API Token is correct! All " + entityType + " will be " + + "Please verify that your " + tokenIdentifier + " is correct! All " + entityType + " will be " + "discarded until the service is restarted."); break; case 403: @@ -757,7 +853,7 @@ private void internalFlush(LinkedBlockingQueue buffer, String format, case 401: logger.log(permissionsMessageType.toString(), Level.SEVERE, "Error sending " + entityType + " to Wavefront (HTTP " + statusCode + "). " + - "Please verify that your API Token is correct! All " + entityType + " will " + + "Please verify that your " + tokenIdentifier + " is correct! All " + entityType + " will " + "be discarded until the service is restarted."); featureDisabledStatusCode.set(statusCode); dropped.inc(items.size()); @@ -824,14 +920,18 @@ private InputStream itemsToStream(List items) { return new ByteArrayInputStream(sb.toString().getBytes(StandardCharsets.UTF_8)); } - /** {@inheritDoc} */ + /** + * {@inheritDoc} + */ @Override public int getFailureCount() { return (int) (pointReportErrors.count() + histogramReportErrors.count() + spanReportErrors.count() + eventsReportErrors.count()); } - /** {@inheritDoc} */ + /** + * {@inheritDoc} + */ @Override public synchronized void close() { if (!closed.compareAndSet(false, true)) { @@ -859,13 +959,13 @@ public synchronized void close() { /** * Dequeue and return a batch of at most N items from buffer (where N = batchSize), broken into * chunks where each chunk has at most M bytes of data (where M = messageSizeBytes). - * + *

* Visible for testing. * - * @param buffer The buffer queue to retrieve items from. - * @param batchSize The maximum number of items to retrieve from the buffer. - * @param messageSizeBytes The maximum number of bytes in each chunk. - * @param dropped A counter counting the number of items that are dropped. + * @param buffer The buffer queue to retrieve items from. + * @param batchSize The maximum number of items to retrieve from the buffer. + * @param messageSizeBytes The maximum number of bytes in each chunk. + * @param dropped A counter counting the number of items that are dropped. * @return A batch of items retrieved from buffer. */ static List> getBatch(LinkedBlockingQueue buffer, int batchSize, diff --git a/src/main/java/com/wavefront/sdk/common/clients/service/ReportingService.java b/src/main/java/com/wavefront/sdk/common/clients/service/ReportingService.java index 6be06407..0eb3868c 100644 --- a/src/main/java/com/wavefront/sdk/common/clients/service/ReportingService.java +++ b/src/main/java/com/wavefront/sdk/common/clients/service/ReportingService.java @@ -1,8 +1,10 @@ package com.wavefront.sdk.common.clients.service; import com.google.common.annotations.VisibleForTesting; + import com.wavefront.sdk.common.Constants; -import com.wavefront.sdk.common.annotation.Nullable; +import com.wavefront.sdk.common.Utils; +import com.wavefront.sdk.common.clients.service.token.TokenService; import com.wavefront.sdk.common.logging.MessageSuppressingLogger; import java.io.IOException; @@ -28,7 +30,8 @@ public class ReportingService implements ReportAPI { // This logger is intended to be configurable in the WavefrontClient.Builder. Given that the invoker controls the // configuration, this is not a static logger. private final MessageSuppressingLogger messageSuppressingLogger; - private final String token; + + private final TokenService tokenService; private final URI uri; private static final int CONNECT_TIMEOUT_MILLIS = 30000; @@ -39,13 +42,14 @@ public class ReportingService implements ReportAPI { /** *

Constructor for ReportingService.

* - * @param uri a {@link java.net.URI} object - * @param token a {@link java.lang.String} object + * @param uri a {@link java.net.URI} object + * @param tokenService a {@link TokenService} object * @param reportingServiceLogSuppressTimeSeconds a long */ - public ReportingService(URI uri, @Nullable String token, long reportingServiceLogSuppressTimeSeconds) { + public ReportingService(URI uri, TokenService tokenService, long reportingServiceLogSuppressTimeSeconds) { this.uri = uri; - this.token = token; + this.tokenService = tokenService; + // Setting suppress time to 0 invalidates the cache used by the message suppressing logger and doesn't log anything. // So defaulting to the minimum of 1 second. reportingServiceLogSuppressTimeSeconds = reportingServiceLogSuppressTimeSeconds <= 0 ? 1 : reportingServiceLogSuppressTimeSeconds; @@ -53,7 +57,9 @@ public ReportingService(URI uri, @Nullable String token, long reportingServiceLo ReportingService.class.getCanonicalName()), reportingServiceLogSuppressTimeSeconds, TimeUnit.SECONDS); } - /** {@inheritDoc} */ + /** + * {@inheritDoc} + */ @Override public int send(String format, InputStream stream) { HttpURLConnection urlConn = null; @@ -65,9 +71,13 @@ public int send(String format, InputStream stream) { urlConn.setRequestMethod("POST"); urlConn.addRequestProperty("Content-Type", "application/octet-stream"); urlConn.addRequestProperty("Content-Encoding", "gzip"); - if (token != null && !token.equals("")) { + + String token = tokenService.getToken(); + + if (!Utils.isNullOrEmpty(token)) { urlConn.addRequestProperty("Authorization", "Bearer " + token); } + urlConn.setConnectTimeout(CONNECT_TIMEOUT_MILLIS); urlConn.setReadTimeout(READ_TIMEOUT_MILLIS); @@ -89,7 +99,9 @@ public int send(String format, InputStream stream) { return statusCode; } - /** {@inheritDoc} */ + /** + * {@inheritDoc} + */ @Override public int sendEvent(InputStream stream) { HttpURLConnection urlConn = null; @@ -100,7 +112,9 @@ public int sendEvent(InputStream stream) { urlConn.setDoOutput(true); urlConn.setRequestMethod("POST"); - if (token != null && !token.equals("")) { + String token = tokenService.getToken(); + + if (!Utils.isNullOrEmpty(token)) { urlConn.addRequestProperty("Authorization", "Bearer " + token); } urlConn.setConnectTimeout(CONNECT_TIMEOUT_MILLIS); diff --git a/src/main/java/com/wavefront/sdk/common/clients/service/token/CSPAuthorizeResponse.java b/src/main/java/com/wavefront/sdk/common/clients/service/token/CSPAuthorizeResponse.java new file mode 100644 index 00000000..adbe4580 --- /dev/null +++ b/src/main/java/com/wavefront/sdk/common/clients/service/token/CSPAuthorizeResponse.java @@ -0,0 +1,23 @@ +package com.wavefront.sdk.common.clients.service.token; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class CSPAuthorizeResponse { + @JsonProperty("id_token") + public String idToken; + + @JsonProperty("token_type") + public String tokenType; + + // This is in seconds + @JsonProperty("expires_in") + public int expiresIn; + + public String scope; + + @JsonProperty("access_token") + public String accessToken; + + @JsonProperty("refresh_token") + public String refreshToken; +} diff --git a/src/main/java/com/wavefront/sdk/common/clients/service/token/CSPServerToServerTokenService.java b/src/main/java/com/wavefront/sdk/common/clients/service/token/CSPServerToServerTokenService.java new file mode 100644 index 00000000..40d039da --- /dev/null +++ b/src/main/java/com/wavefront/sdk/common/clients/service/token/CSPServerToServerTokenService.java @@ -0,0 +1,179 @@ +package com.wavefront.sdk.common.clients.service.token; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.wavefront.sdk.common.NamedThreadFactory; +import com.wavefront.sdk.common.Utils; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Base64; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Logger; +import java.util.stream.Collectors; + +public class CSPServerToServerTokenService implements TokenService, Runnable { + private static final Logger log = Logger.getLogger(CSPServerToServerTokenService.class.getCanonicalName()); + + private final static String OAUTH_PATH = "/csp/gateway/am/api/auth/authorize"; + private final static int TEN_MINUTES = 600; + private final static int THIRTY_SECONDS = 30; + private final static int THREE_MINUTES = 180; + private static int DEFAULT_THREAD_DELAY = 60; + + private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("csp-server-to-server-token-service")); + private final ObjectMapper mapper = new ObjectMapper(); + private final AtomicBoolean tokenReady = new AtomicBoolean(false); + + private final String cspBaseURL; + private final String cspClientId; + private final String cspClientSecret; + + private final int connectTimeoutMillis; + private final int readTimeoutMillis; + private String cspAccessToken; + + public CSPServerToServerTokenService(final String cspBaseURL, final String cspClientId, final String cspClientSecret) { + this.cspBaseURL = cspBaseURL; + this.cspClientId = cspClientId; + this.cspClientSecret = cspClientSecret; + this.connectTimeoutMillis = 30_000; + this.readTimeoutMillis = 10_000; + } + + public CSPServerToServerTokenService(final String cspBaseURL, final String cspClientId, final String cspClientSecret, final int connectTimeoutMillis, final int readTimeoutMillis) { + this.cspBaseURL = cspBaseURL; + this.cspClientId = cspClientId; + this.cspClientSecret = cspClientSecret; + this.connectTimeoutMillis = connectTimeoutMillis; + this.readTimeoutMillis = readTimeoutMillis; + } + + @Override + public synchronized String getToken() { + // First access gets the token and is blocking, which schedules the next token fetch. + if (!tokenReady.get()) { + run(); + tokenReady.set(true); + } + + return cspAccessToken; + } + + private String getCSPToken() { + HttpURLConnection urlConn = null; + + final String urlParameters = "grant_type=client_credentials"; + final byte[] postData = urlParameters.getBytes(StandardCharsets.UTF_8); + + try { + final URL url = new URL(cspBaseURL + OAUTH_PATH); + urlConn = (HttpURLConnection) url.openConnection(); + urlConn.setDoOutput(true); + urlConn.setRequestMethod("POST"); + urlConn.addRequestProperty("Content-Type", "application/x-www-form-urlencoded"); + urlConn.addRequestProperty("Authorization", "Basic " + buildHttpBasicToken(cspClientId, cspClientSecret)); + urlConn.setRequestProperty("Content-Length", Integer.toString(postData.length)); + + urlConn.setConnectTimeout(connectTimeoutMillis); + urlConn.setReadTimeout(readTimeoutMillis); + + //Send request + final DataOutputStream wr = new DataOutputStream(urlConn.getOutputStream()); + wr.write(postData); + wr.flush(); + wr.close(); + + final int statusCode = urlConn.getResponseCode(); + + if (statusCode == 200) { + try { + final CSPAuthorizeResponse parsedResponse = mapper.readValue(urlConn.getInputStream(), CSPAuthorizeResponse.class); + + if (!hasDirectIngestScope(parsedResponse.scope)) { + log.warning("The CSP response did not find any scope matching 'aoa:directDataIngestion' which is required for Wavefront direct ingestion."); + } + + // Schedule token refresh in the future + int threadDelay = getThreadDelay(parsedResponse.expiresIn); + + log.info("A CSP token has been received. Will schedule the CSP token to be refreshed in: " + threadDelay + " seconds"); + + executor.schedule(this, threadDelay, TimeUnit.SECONDS); + + return parsedResponse.accessToken; + } catch (JsonProcessingException e) { + log.severe("The request to CSP returned invalid json. Please restart your app."); + + return "INVALID_TOKEN"; + } + } else { + log.warning("The request to CSP returned: " + statusCode); + + if (statusCode >= 500 && statusCode < 600) { + log.info("The Wavefront SDK will try to reauthenticate with CSP on the next request."); + tokenReady.set(false); + + return null; + } + + // Anything not 5xx will return INVALID_TOKEN + return "INVALID_TOKEN"; + } + + } catch (IOException ex) { + // Connection Problem + log.warning("Error connecting to CSP: " + ex.getLocalizedMessage()); + + log.info("The Wavefront SDK will try to reauthenticate with CSP on the next request."); + tokenReady.set(false); + + return null; + } + } + + private int getThreadDelay(final int expiresIn) { + int retVal; + + if (expiresIn < TEN_MINUTES) { + retVal = expiresIn - THIRTY_SECONDS; + } else { + retVal = expiresIn - THREE_MINUTES; + } + + if (retVal <= 0) { + retVal = DEFAULT_THREAD_DELAY; + } + + return retVal; + } + + public synchronized void run() { + this.cspAccessToken = getCSPToken(); + } + + private static List parseScopes(final String scope) { + return Arrays.stream(scope.split("\\s")).collect(Collectors.toList()); + } + + public static boolean hasDirectIngestScope(final String scopeList) { + if (!Utils.isNullOrEmpty(scopeList)) { + return parseScopes(scopeList).stream().anyMatch(s -> s.contains("aoa:directDataIngestion") || s.contains("aoa/*") || s.contains("aoa:*")); + } + + return false; + } + + private String buildHttpBasicToken(final String cspClientId, final String cspClientSecret) { + final String encodeMe = cspClientId + ":" + cspClientSecret; + return Base64.getEncoder().encodeToString(encodeMe.getBytes()); + } +} diff --git a/src/main/java/com/wavefront/sdk/common/clients/service/token/NoopTokenService.java b/src/main/java/com/wavefront/sdk/common/clients/service/token/NoopTokenService.java new file mode 100644 index 00000000..12173907 --- /dev/null +++ b/src/main/java/com/wavefront/sdk/common/clients/service/token/NoopTokenService.java @@ -0,0 +1,13 @@ +package com.wavefront.sdk.common.clients.service.token; + +// Primarily for proxy usage +public class NoopTokenService implements TokenService { + + public NoopTokenService() { + } + + @Override + public String getToken() { + return ""; + } +} diff --git a/src/main/java/com/wavefront/sdk/common/clients/service/token/TokenService.java b/src/main/java/com/wavefront/sdk/common/clients/service/token/TokenService.java new file mode 100644 index 00000000..89bd97e9 --- /dev/null +++ b/src/main/java/com/wavefront/sdk/common/clients/service/token/TokenService.java @@ -0,0 +1,5 @@ +package com.wavefront.sdk.common.clients.service.token; + +public interface TokenService { + String getToken(); +} diff --git a/src/main/java/com/wavefront/sdk/common/clients/service/token/WavefrontTokenService.java b/src/main/java/com/wavefront/sdk/common/clients/service/token/WavefrontTokenService.java new file mode 100644 index 00000000..0a8053df --- /dev/null +++ b/src/main/java/com/wavefront/sdk/common/clients/service/token/WavefrontTokenService.java @@ -0,0 +1,14 @@ +package com.wavefront.sdk.common.clients.service.token; + +public class WavefrontTokenService implements TokenService { + private final String token; + + public WavefrontTokenService(final String token) { + this.token = token; + } + + @Override + public String getToken() { + return token; + } +} diff --git a/src/test/java/com/wavefront/sdk/common/UtilsTest.java b/src/test/java/com/wavefront/sdk/common/UtilsTest.java index 4c5661df..0a6e8e04 100644 --- a/src/test/java/com/wavefront/sdk/common/UtilsTest.java +++ b/src/test/java/com/wavefront/sdk/common/UtilsTest.java @@ -17,6 +17,7 @@ import static com.wavefront.sdk.common.Utils.*; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.fail; /** @@ -850,4 +851,13 @@ public void convertSemVerToGauge() throws IOException { assertEquals(1.1010D, Utils.convertSemVerToGauge("1.10.10")); } + @Test + public void testIsNullOrEmpty() { + assertTrue(Utils.isNullOrEmpty(null)); + + assertTrue(Utils.isNullOrEmpty("")); + + assertFalse(Utils.isNullOrEmpty("hello")); + } + } diff --git a/src/test/java/com/wavefront/sdk/common/clients/WavefrontClientTest.java b/src/test/java/com/wavefront/sdk/common/clients/WavefrontClientTest.java index c08b9b33..261f1ab8 100644 --- a/src/test/java/com/wavefront/sdk/common/clients/WavefrontClientTest.java +++ b/src/test/java/com/wavefront/sdk/common/clients/WavefrontClientTest.java @@ -3,10 +3,13 @@ import com.github.tomakehurst.wiremock.WireMockServer; import com.github.tomakehurst.wiremock.client.WireMock; import com.wavefront.sdk.common.Pair; +import com.wavefront.sdk.common.clients.service.token.CSPServerToServerTokenService; +import com.wavefront.sdk.common.clients.service.token.NoopTokenService; +import com.wavefront.sdk.common.clients.service.token.WavefrontTokenService; import com.wavefront.sdk.common.metrics.WavefrontSdkDeltaCounter; import com.wavefront.sdk.entities.histograms.HistogramGranularity; - import com.wavefront.sdk.entities.tracing.SpanLog; + import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; @@ -16,15 +19,28 @@ import org.junit.jupiter.params.provider.ValueSource; import java.net.ServerSocket; -import java.util.*; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.UUID; import java.util.concurrent.LinkedBlockingQueue; -import static com.github.tomakehurst.wiremock.client.WireMock.*; +import static com.github.tomakehurst.wiremock.client.WireMock.equalTo; +import static com.github.tomakehurst.wiremock.client.WireMock.equalToJson; +import static com.github.tomakehurst.wiremock.client.WireMock.matching; +import static com.github.tomakehurst.wiremock.client.WireMock.matchingJsonPath; +import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor; +import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; +import static com.github.tomakehurst.wiremock.client.WireMock.urlPathMatching; import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig; -import static org.easymock.EasyMock.*; +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.expectLastCall; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; /** @@ -214,6 +230,28 @@ void canSendTracesToDifferentPort() { @Nested class Builder { + + @Test + public void tokenServiceClassTest() { + WavefrontClient wfClient = new WavefrontClient.Builder("", "TOKEN") + .build(); + assertNotNull(wfClient); + assertNotNull(wfClient.getTokenService()); + assertEquals(wfClient.getTokenService().getClass().getSimpleName(), WavefrontTokenService.class.getSimpleName()); + + wfClient = new WavefrontClient.Builder("") + .build(); + assertNotNull(wfClient); + assertNotNull(wfClient.getTokenService()); + assertEquals(wfClient.getTokenService().getClass().getSimpleName(), NoopTokenService.class.getSimpleName()); + + wfClient = new WavefrontClient.Builder("", "cspClientId", "cspClientSecret") + .build(); + assertNotNull(wfClient); + assertNotNull(wfClient.getTokenService()); + assertEquals(wfClient.getTokenService().getClass().getSimpleName(), CSPServerToServerTokenService.class.getSimpleName()); + } + @Nested class ValidateEndpoint { @Test diff --git a/src/test/java/com/wavefront/sdk/common/clients/service/CSPServerToServerTokenServiceTest.java b/src/test/java/com/wavefront/sdk/common/clients/service/CSPServerToServerTokenServiceTest.java new file mode 100644 index 00000000..76d162f0 --- /dev/null +++ b/src/test/java/com/wavefront/sdk/common/clients/service/CSPServerToServerTokenServiceTest.java @@ -0,0 +1,127 @@ +package com.wavefront.sdk.common.clients.service; + +import com.github.tomakehurst.wiremock.WireMockServer; +import com.github.tomakehurst.wiremock.client.WireMock; +import com.github.tomakehurst.wiremock.stubbing.Scenario; +import com.wavefront.sdk.common.clients.service.token.CSPServerToServerTokenService; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import java.lang.reflect.Field; +import java.util.UUID; + +import static com.github.tomakehurst.wiremock.client.WireMock.*; +import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig; +import static com.wavefront.sdk.common.clients.service.token.CSPServerToServerTokenService.hasDirectIngestScope; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class CSPServerToServerTokenServiceTest { + + @Test + public void testHasDirectIngestScope() { + final String uuid = UUID.randomUUID().toString(); + + final String scopeString = "external/" + uuid + "/*/aoa:directDataIngestion external/" + uuid + "/aoa:directDataIngestion csp:org_member"; + + assertTrue(hasDirectIngestScope(scopeString)); + assertFalse(hasDirectIngestScope("no direct data ingestion scope")); + assertFalse(hasDirectIngestScope("")); + assertFalse(hasDirectIngestScope(null)); + assertTrue(hasDirectIngestScope("aoa/*")); + assertTrue(hasDirectIngestScope("some aoa/*")); + assertTrue(hasDirectIngestScope("aoa:*")); + assertTrue(hasDirectIngestScope("some aoa:*")); + } + + @Nested + class WireMockTests { + WireMockServer mockBackend; + + private final String MOCK_RESPONSE = "{\"scope\":\"scope aoa/*\",\"id_token\":null,\"token_type\":\"bearer\",\"expires_in\":1,\"access_token\":\"accessToken\",\"refresh_token\":null}\n"; + private final String MOCK_RESPONSE2 = "{\"scope\":\"scope aoa/*\",\"id_token\":null,\"token_type\":\"bearer\",\"expires_in\":1,\"access_token\":\"accessToken2\",\"refresh_token\":null}\n"; + + @BeforeEach + void setup() { + mockBackend = new WireMockServer(wireMockConfig().dynamicPort()); + } + + @AfterEach + void teardown() { + mockBackend.stop(); + } + + @Test + void testCSPReturnsAccessToken() { + mockBackend.stubFor(WireMock.post(urlPathMatching("/csp/gateway/am/api/auth/authorize")).willReturn(WireMock.ok(MOCK_RESPONSE))); + mockBackend.start(); + + CSPServerToServerTokenService cspServerToServerTokenService = new CSPServerToServerTokenService(mockBackend.baseUrl(), "N/A", "N/A"); + assertNotNull(cspServerToServerTokenService); + assertEquals(cspServerToServerTokenService.getToken(), "accessToken"); + } + + @Test + void testCSPMultipleAccessTokens() throws InterruptedException, NoSuchFieldException, IllegalAccessException { + createWireMockStubWithStates("/csp/gateway/am/api/auth/authorize", Scenario.STARTED, "second", MOCK_RESPONSE); + createWireMockStubWithStates("/csp/gateway/am/api/auth/authorize", "second", "third", MOCK_RESPONSE2); + mockBackend.start(); + + CSPServerToServerTokenService cspServerToServerTokenService = new CSPServerToServerTokenService(mockBackend.baseUrl(), "N/A", "N/A"); + + Field field = CSPServerToServerTokenService.class.getDeclaredField("DEFAULT_THREAD_DELAY"); + field.setAccessible(true); + field.set(cspServerToServerTokenService, 1); + + assertNotNull(cspServerToServerTokenService); + assertEquals(cspServerToServerTokenService.getToken(), "accessToken"); + Thread.sleep(2000); + assertEquals(cspServerToServerTokenService.getToken(), "accessToken2"); + } + + @Test + void testCSPReturns401() { + mockBackend.stubFor(WireMock.post(urlPathMatching("/csp/gateway/am/api/auth/authorize")).willReturn(WireMock.unauthorized())); + mockBackend.start(); + + CSPServerToServerTokenService cspServerToServerTokenService = new CSPServerToServerTokenService(mockBackend.baseUrl(), "N/A", "N/A"); + assertEquals(cspServerToServerTokenService.getToken(), "INVALID_TOKEN"); + } + + @Test + void testCSPReturns500() { + mockBackend.stubFor(WireMock.post(urlPathMatching("/csp/gateway/am/api/auth/authorize")).willReturn(WireMock.serverError())); + mockBackend.start(); + + CSPServerToServerTokenService cspServerToServerTokenService = new CSPServerToServerTokenService(mockBackend.baseUrl(), "N/A", "N/A"); + assertNull(cspServerToServerTokenService.getToken()); + } + + @Test + void testCSPConnectionError() { + mockBackend.stubFor(WireMock.post(urlPathMatching("/csp/gateway/am/api/auth/authorize")).willReturn(WireMock.serverError())); + mockBackend.setGlobalFixedDelay(5_000); + mockBackend.start(); + + CSPServerToServerTokenService cspServerToServerTokenService = new CSPServerToServerTokenService(mockBackend.baseUrl(), "N/A", "N/A", 100, 100); + assertNull(cspServerToServerTokenService.getToken()); + } + + private void createWireMockStubWithStates(final String url, final String currentState, final String nextState, final String responseBody) { + mockBackend.stubFor(post(urlEqualTo(url)) + .inScenario("csp") + .whenScenarioStateIs(currentState) + .willSetStateTo(nextState) + .willReturn(aResponse() + .withStatus(200) + .withBody(responseBody))); + } + + } +} \ No newline at end of file