Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ updates:
directory: "/"
schedule:
interval: "daily"
open-pull-requests-limit: 10
4 changes: 2 additions & 2 deletions .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ jobs:
java: ["8", "11", "17"]

steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: Set up JDK
uses: actions/setup-java@v2
with:
java-version: ${{ matrix.java }}
distribution: 'temurin'
distribution: temurin
cache: maven
- name: Build with Maven
run: mvn --batch-mode --file pom.xml package
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,14 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>32.1.1-jre</version>
<version>32.1.2-jre</version>
</dependency>

<!-- Test Dependencies -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.9.3</version>
<version>5.10.0</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/com/wavefront/sdk/common/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -832,4 +832,8 @@ public static double convertSemVerToGauge(String version) {
}
return 0.0D;
}

public static boolean isNullOrEmpty(final String string) {
return string == null || string.isEmpty();
}
}
154 changes: 127 additions & 27 deletions src/main/java/com/wavefront/sdk/common/clients/WavefrontClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
import com.wavefront.sdk.common.annotation.NonNull;
import com.wavefront.sdk.common.annotation.Nullable;
import com.wavefront.sdk.common.clients.service.ReportingService;
import com.wavefront.sdk.common.clients.service.token.CSPServerToServerTokenService;
import com.wavefront.sdk.common.clients.service.token.NoopTokenService;
import com.wavefront.sdk.common.clients.service.token.TokenService;
import com.wavefront.sdk.common.clients.service.token.WavefrontTokenService;
import com.wavefront.sdk.common.logging.MessageDedupingLogger;
import com.wavefront.sdk.common.metrics.WavefrontSdkDeltaCounter;
import com.wavefront.sdk.common.metrics.WavefrontSdkMetricsRegistry;
Expand Down Expand Up @@ -137,10 +141,17 @@ public class WavefrontClient implements WavefrontSender, Runnable {
// Flag to prevent sending after close() has been called
private final AtomicBoolean closed = new AtomicBoolean(false);

private final TokenService tokenService;

public static class Builder {
private static final String CSP_DEFAULT_BASE_URL = "https://console.cloud.vmware.com/";
// Required parameters
private final String server;
private final String token;
private final String cspClientId;
private final String cspClientSecret;

private String cspBaseUrl;

// Optional parameters
private int metricsPort = -1;
Expand All @@ -152,7 +163,7 @@ public static class Builder {
private TimeUnit flushIntervalTimeUnit = TimeUnit.SECONDS;
private int messageSizeBytes = Integer.MAX_VALUE;
private boolean includeSdkMetrics = true;
private Map<String, String> tags = Maps.newHashMap();
private final Map<String, String> tags = Maps.newHashMap();

private URI metricsUri;
private URI tracesUri;
Expand All @@ -165,17 +176,46 @@ public static class Builder {
* @param token A valid API token with direct ingestion permissions
*/
public Builder(String server, @Nullable String token) {
this(server, token, null, null, null);
}

/**
* Create a new WavefrontClient.Builder
*
* @param server A server URL of the form "https://clusterName.wavefront.com" or
* "http://internal.proxy.com:port"
* @param token A valid API token with direct ingestion permissions
* @param cspBaseUrl A server URL that points to the CSP service
* @param cspClientId Client ID for CSP
* @param cspClientSecret Client Secret for CSP
*/
private Builder(String server, @Nullable String token, @Nullable String cspBaseUrl, @Nullable String cspClientId, @Nullable String cspClientSecret) {
this.server = server;
this.token = token;
this.cspBaseUrl = cspBaseUrl;
this.cspClientId = cspClientId;
this.cspClientSecret = cspClientSecret;
}

/**
* Create a new WavefrontClient.Builder
*
* @param server A server URL of the form "https://clusterName.wavefront.com" or
* "http://internal.proxy.com:port"
* @param cspClientId Client ID for CSP
* @param cspClientSecret Client Secret for CSP
*/
public Builder(String server, @Nullable String cspClientId, @Nullable String cspClientSecret) {
this(server, null, CSP_DEFAULT_BASE_URL, cspClientId, cspClientSecret);
}

/**
* Create a new WavefrontClient.Builder
*
* @param proxyServer A server URL of the the form "http://internal.proxy.com:port"
* @param proxyServer A server URL of the form "http://internal.proxy.com:port"
*/
public Builder(String proxyServer) {
this(proxyServer, null);
this(proxyServer, null, null, null, null);
}

/**
Expand All @@ -189,6 +229,11 @@ public Builder maxQueueSize(int maxQueueSize) {
return this;
}

public Builder cspBaseUrl(final String cspBaseUrl) {
this.cspBaseUrl = cspBaseUrl;
return this;
}

/**
* Set batch size to be reported during every flush.
*
Expand Down Expand Up @@ -274,6 +319,7 @@ public Builder includeSdkMetrics(boolean includeSdkMetrics) {

/**
* Set the tags to apply to the internal SDK metrics
*
* @param tags a map of point tags to apply to the internal sdk metrics
* @return {@code this}
*/
Expand Down Expand Up @@ -332,7 +378,7 @@ public WavefrontClient build() {
this.metricsUri = buildUri(this.server, this.metricsPort);
this.tracesUri = buildUri(this.server, this.tracesPort);
} catch (URISyntaxException e) {
throw new IllegalStateException(e);
throw new IllegalStateException(e);
}

return new WavefrontClient(this);
Expand All @@ -345,7 +391,7 @@ private URI buildUri(String server, int port) throws URISyntaxException {
}

return new URI(uri.getScheme(), null, uri.getHost(), port, uri.getPath(), uri.getQuery(),
uri.getFragment());
uri.getFragment());
}
}

Expand All @@ -359,6 +405,26 @@ private WavefrontClient(Builder builder) {
}
defaultSource = tempSource;

if (!Utils.isNullOrEmpty(builder.token)) {
tokenService = new WavefrontTokenService(builder.token);
} else if (!Utils.isNullOrEmpty(builder.cspBaseUrl) && !Utils.isNullOrEmpty(builder.cspClientId) && !Utils.isNullOrEmpty(builder.cspClientSecret)) {
tokenService = new CSPServerToServerTokenService(builder.cspBaseUrl, builder.cspClientId, builder.cspClientSecret);
} else {
tokenService = new NoopTokenService();
}

switch (tokenService.getClass().getSimpleName()) {
case "CSPServerToServerTokenService":
logger.log(Level.INFO, "The Wavefront SDK will use CSP authentication when communicating with the Wavefront Backend for Direct Ingestion.");
break;
case "WavefrontTokenService":
logger.log(Level.INFO, "The Wavefront SDK will use an API TOKEN when communicating with the Wavefront Backend for Direct Ingestion.");
break;
case "NoopTokenService":
logger.log(Level.INFO, "The Wavefront SDK will communicate with a Wavefront Proxy.");
break;
}

batchSize = builder.batchSize;
messageSizeBytes = builder.messageSizeBytes;
metricsBuffer = new LinkedBlockingQueue<>(builder.maxQueueSize);
Expand All @@ -367,8 +433,8 @@ private WavefrontClient(Builder builder) {
spanLogsBuffer = new LinkedBlockingQueue<>(builder.maxQueueSize);
eventsBuffer = new LinkedBlockingQueue<>(builder.maxQueueSize);
logsBuffer = new LinkedBlockingQueue<>(builder.maxQueueSize);
metricsReportingService = new ReportingService(builder.metricsUri, builder.token, builder.reportingServiceLogSuppressTimeSeconds);
tracesReportingService = new ReportingService(builder.tracesUri, builder.token, builder.reportingServiceLogSuppressTimeSeconds);
metricsReportingService = new ReportingService(builder.metricsUri, tokenService, builder.reportingServiceLogSuppressTimeSeconds);
tracesReportingService = new ReportingService(builder.tracesUri, tokenService, builder.reportingServiceLogSuppressTimeSeconds);
scheduler = Executors.newScheduledThreadPool(1,
new NamedThreadFactory("wavefrontClientSender").setDaemon(true));
scheduler.scheduleAtFixedRate(this, 1, builder.flushInterval, builder.flushIntervalTimeUnit);
Expand Down Expand Up @@ -440,13 +506,21 @@ private WavefrontClient(Builder builder) {
this.clientId = builder.server;
}

/** {@inheritDoc} */
/**
* {@inheritDoc}
*/
@Override
public String getClientId() {
return clientId;
}

/** {@inheritDoc} */
public TokenService getTokenService() {
return tokenService;
}

/**
* {@inheritDoc}
*/
@Override
public void sendMetric(String name, double value, @Nullable Long timestamp,
@Nullable String source, @Nullable Map<String, String> tags)
Expand All @@ -472,7 +546,9 @@ public void sendMetric(String name, double value, @Nullable Long timestamp,
}
}

/** {@inheritDoc} */
/**
* {@inheritDoc}
*/
@Override
public void sendFormattedMetric(String point) throws IOException {
if (closed.get()) {
Expand All @@ -494,7 +570,9 @@ public void sendFormattedMetric(String point) throws IOException {
}
}

/** {@inheritDoc} */
/**
* {@inheritDoc}
*/
@Override
public void sendDistribution(String name, List<Pair<Double, Integer>> centroids,
Set<HistogramGranularity> histogramGranularities,
Expand Down Expand Up @@ -524,10 +602,12 @@ public void sendDistribution(String name, List<Pair<Double, Integer>> centroids,
}
}

/** {@inheritDoc} */
/**
* {@inheritDoc}
*/
@Override
public void sendLog(String name, double value, Long timestamp, String source,
Map<String, String> tags) throws IOException {
Map<String, String> tags) throws IOException {
if (closed.get()) {
throw new IOException("attempt to send using closed sender");
}
Expand All @@ -550,7 +630,9 @@ public void sendLog(String name, double value, Long timestamp, String source,
}
}

/** {@inheritDoc} */
/**
* {@inheritDoc}
*/
@Override
public void sendEvent(String name, long startMillis, long endMillis, @Nullable String source,
@Nullable Map<String, String> tags,
Expand Down Expand Up @@ -584,7 +666,9 @@ public void sendEvent(String name, long startMillis, long endMillis, @Nullable S
}
}

/** {@inheritDoc} */
/**
* {@inheritDoc}
*/
@Override
public void sendSpan(String name, long startMillis, long durationMillis,
@Nullable String source, UUID traceId, UUID spanId,
Expand All @@ -611,7 +695,7 @@ public void sendSpan(String name, long startMillis, long durationMillis,
String spanSecondaryId = null;
if (tags != null) {
spanSecondaryId = tags.stream().filter(pair -> pair._1.equals(SPAN_SECONDARY_ID_KEY))
.map(pair -> pair._2).findFirst().orElse(null);
.map(pair -> pair._2).findFirst().orElse(null);
}
sendSpanLogs(traceId, spanId, spanLogs, span, spanSecondaryId);
}
Expand Down Expand Up @@ -647,7 +731,9 @@ private void sendSpanLogs(
}
}

/** {@inheritDoc} */
/**
* {@inheritDoc}
*/
@Override
public void run() {
try {
Expand All @@ -658,7 +744,9 @@ public void run() {
}
}

/** {@inheritDoc} */
/**
* {@inheritDoc}
*/
@Override
public void flush() throws IOException {
if (closed.get()) {
Expand Down Expand Up @@ -702,6 +790,14 @@ private void internalFlush(LinkedBlockingQueue<String> buffer, String format,
LogMessageType permissionsMessageType,
LogMessageType bufferFullMessageType)
throws IOException {

String tokenIdentifier = "";
if (tokenService.getClass().equals(CSPServerToServerTokenService.class)) {
tokenIdentifier = "CSP ACCESS TOKEN";
} else if (tokenService.getClass().equals(WavefrontTokenService.class)) {
tokenIdentifier = "API TOKEN";
}

ReportingService entityReportingService;
switch (format) {
case Constants.WAVEFRONT_SPAN_LOG_FORMAT:
Expand All @@ -727,7 +823,7 @@ private void internalFlush(LinkedBlockingQueue<String> buffer, String format,
switch (featureDisabledReason) {
case 401:
logger.log(permissionsMessageType.toString(), Level.SEVERE,
"Please verify that your API Token is correct! All " + entityType + " will be " +
"Please verify that your " + tokenIdentifier + " is correct! All " + entityType + " will be " +
"discarded until the service is restarted.");
break;
case 403:
Expand Down Expand Up @@ -757,7 +853,7 @@ private void internalFlush(LinkedBlockingQueue<String> buffer, String format,
case 401:
logger.log(permissionsMessageType.toString(), Level.SEVERE,
"Error sending " + entityType + " to Wavefront (HTTP " + statusCode + "). " +
"Please verify that your API Token is correct! All " + entityType + " will " +
"Please verify that your " + tokenIdentifier + " is correct! All " + entityType + " will " +
"be discarded until the service is restarted.");
featureDisabledStatusCode.set(statusCode);
dropped.inc(items.size());
Expand Down Expand Up @@ -824,14 +920,18 @@ private InputStream itemsToStream(List<String> items) {
return new ByteArrayInputStream(sb.toString().getBytes(StandardCharsets.UTF_8));
}

/** {@inheritDoc} */
/**
* {@inheritDoc}
*/
@Override
public int getFailureCount() {
return (int) (pointReportErrors.count() + histogramReportErrors.count() +
spanReportErrors.count() + eventsReportErrors.count());
}

/** {@inheritDoc} */
/**
* {@inheritDoc}
*/
@Override
public synchronized void close() {
if (!closed.compareAndSet(false, true)) {
Expand Down Expand Up @@ -859,13 +959,13 @@ public synchronized void close() {
/**
* Dequeue and return a batch of at most N items from buffer (where N = batchSize), broken into
* chunks where each chunk has at most M bytes of data (where M = messageSizeBytes).
*
* <p>
* Visible for testing.
*
* @param buffer The buffer queue to retrieve items from.
* @param batchSize The maximum number of items to retrieve from the buffer.
* @param messageSizeBytes The maximum number of bytes in each chunk.
* @param dropped A counter counting the number of items that are dropped.
* @param buffer The buffer queue to retrieve items from.
* @param batchSize The maximum number of items to retrieve from the buffer.
* @param messageSizeBytes The maximum number of bytes in each chunk.
* @param dropped A counter counting the number of items that are dropped.
* @return A batch of items retrieved from buffer.
*/
static List<List<String>> getBatch(LinkedBlockingQueue<String> buffer, int batchSize,
Expand Down
Loading