Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 20 additions & 33 deletions src/main/java/com/schematic/api/EventBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import com.schematic.api.resources.events.EventsClient;
import com.schematic.api.resources.events.requests.CreateEventBatchRequestBody;
import com.schematic.api.types.CreateEventRequestBody;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -42,10 +41,7 @@ public class EventBuffer implements AutoCloseable {
* @param maxBatchSize Maximum number of events to include in a single batch
* @param flushInterval How often to automatically flush the buffer
*/
public EventBuffer(EventsClient eventsClient,
SchematicLogger logger,
int maxBatchSize,
Duration flushInterval) {
public EventBuffer(EventsClient eventsClient, SchematicLogger logger, int maxBatchSize, Duration flushInterval) {
this.events = new ConcurrentLinkedQueue<>();
this.maxBatchSize = maxBatchSize > 0 ? maxBatchSize : DEFAULT_MAX_BATCH_SIZE;
this.flushInterval = flushInterval != null ? flushInterval : DEFAULT_FLUSH_INTERVAL;
Expand Down Expand Up @@ -120,24 +116,20 @@ public void flush() {

private void sendBatchWithRetry(List<CreateEventRequestBody> batch, int retryCount) {
try {
CreateEventBatchRequestBody requestBody = CreateEventBatchRequestBody.builder()
.events(batch)
.build();
CreateEventBatchRequestBody requestBody =
CreateEventBatchRequestBody.builder().events(batch).build();

eventsClient.createEventBatch(requestBody);
processedEvents.addAndGet(batch.size());

} catch (Exception e) {
if (retryCount < MAX_RETRY_ATTEMPTS) {
long delayMillis = RETRY_INITIAL_DELAY.toMillis() * (1L << retryCount);
logger.warn("Failed to send event batch, attempting retry %d of %d in %d ms",
retryCount + 1, MAX_RETRY_ATTEMPTS, delayMillis);

scheduler.schedule(
() -> sendBatchWithRetry(batch, retryCount + 1),
delayMillis,
TimeUnit.MILLISECONDS
);
logger.warn(
"Failed to send event batch, attempting retry %d of %d in %d ms",
retryCount + 1, MAX_RETRY_ATTEMPTS, delayMillis);

scheduler.schedule(() -> sendBatchWithRetry(batch, retryCount + 1), delayMillis, TimeUnit.MILLISECONDS);
} else {
failedEvents.addAndGet(batch.size());
logger.error("Failed to flush events: " + e.getMessage());
Expand All @@ -147,17 +139,16 @@ private void sendBatchWithRetry(List<CreateEventRequestBody> batch, int retryCou

private void startPeriodicFlush() {
scheduler.scheduleAtFixedRate(
() -> {
try {
flush();
} catch (Exception e) {
logger.error("Error during periodic flush: %s", e.getMessage());
}
},
flushInterval.toMillis(),
flushInterval.toMillis(),
TimeUnit.MILLISECONDS
);
() -> {
try {
flush();
} catch (Exception e) {
logger.error("Error during periodic flush: %s", e.getMessage());
}
},
flushInterval.toMillis(),
flushInterval.toMillis(),
TimeUnit.MILLISECONDS);
}

/**
Expand Down Expand Up @@ -195,12 +186,8 @@ public void close() {
*/
public String getMetrics() {
return String.format(
"EventBuffer Metrics - Processed: %d, Dropped: %d, Failed: %d, Current Queue Size: %d",
processedEvents.get(),
droppedEvents.get(),
failedEvents.get(),
events.size()
);
"EventBuffer Metrics - Processed: %d, Dropped: %d, Failed: %d, Current Queue Size: %d",
processedEvents.get(), droppedEvents.get(), failedEvents.get(), events.size());
}

/**
Expand Down
99 changes: 49 additions & 50 deletions src/main/java/com/schematic/api/Schematic.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,30 +38,29 @@ private Schematic(Builder builder) {
super(buildClientOptions(builder.apiKey, builder));

this.apiKey = builder.apiKey;
this.eventBufferInterval = builder.eventBufferInterval != null ?
builder.eventBufferInterval :
Duration.ofMillis(5000);
this.eventBufferInterval =
builder.eventBufferInterval != null ? builder.eventBufferInterval : Duration.ofMillis(5000);
this.logger = builder.logger != null ? builder.logger : new ConsoleLogger();
this.flagDefaults = builder.flagDefaults != null ? builder.flagDefaults : new HashMap<>();
this.offline = builder.offline;
this.flagCheckCacheProviders = builder.cacheProviders != null ?
builder.cacheProviders :
Collections.singletonList(new LocalCache<>());
this.flagCheckCacheProviders =
builder.cacheProviders != null ? builder.cacheProviders : Collections.singletonList(new LocalCache<>());

this.eventBuffer = new EventBuffer(
super.events(),
this.logger,
builder.eventBufferMaxSize,
builder.eventBufferInterval != null ? builder.eventBufferInterval : Duration.ofMillis(5000)
);

this.shutdownHook = new Thread(() -> {
try {
this.eventBuffer.close();
} catch (Exception e) {
logger.error("Error during Schematic shutdown: " + e.getMessage());
}
}, "SchematicShutdownHook");
super.events(),
this.logger,
builder.eventBufferMaxSize,
builder.eventBufferInterval != null ? builder.eventBufferInterval : Duration.ofMillis(5000));

this.shutdownHook = new Thread(
() -> {
try {
this.eventBuffer.close();
} catch (Exception e) {
logger.error("Error during Schematic shutdown: " + e.getMessage());
}
},
"SchematicShutdownHook");

Runtime.getRuntime().addShutdownHook(this.shutdownHook);
}
Expand Down Expand Up @@ -137,10 +136,10 @@ public Schematic build() {
private static ClientOptions buildClientOptions(String apiKey, Builder builder) {
String basePath = builder.basePath != null ? builder.basePath : "https://api.schematichq.com";
return ClientOptions.builder()
.environment(Environment.custom(basePath))
.addHeader("Authorization", "Bearer " + apiKey)
.addHeader("Content-Type", "application/json")
.build();
.environment(Environment.custom(basePath))
.addHeader("Authorization", "Bearer " + apiKey)
.addHeader("Content-Type", "application/json")
.build();
}

public List<CacheProvider<Boolean>> getFlagCheckCacheProviders() {
Expand Down Expand Up @@ -176,10 +175,8 @@ public boolean checkFlag(String flagKey, Map<String, String> company, Map<String
}

// Make API call
CheckFlagRequestBody request = CheckFlagRequestBody.builder()
.company(company)
.user(user)
.build();
CheckFlagRequestBody request =
CheckFlagRequestBody.builder().company(company).user(user).build();

CheckFlagResponse response = features().checkFlag(flagKey, request);
boolean value = response.getData().getValue();
Expand All @@ -196,45 +193,47 @@ public boolean checkFlag(String flagKey, Map<String, String> company, Map<String
}
}

public void identify(Map<String, String> keys, EventBodyIdentifyCompany company, String name, Map<String, Object> traits) {
public void identify(
Map<String, String> keys, EventBodyIdentifyCompany company, String name, Map<String, Object> traits) {
if (offline) return;

try {
EventBodyIdentify body = EventBodyIdentify.builder()
.keys(keys)
.company(company)
.name(name)
.traits(objectMapToJsonNode(traits))
.build();
.keys(keys)
.company(company)
.name(name)
.traits(objectMapToJsonNode(traits))
.build();

CreateEventRequestBody event = CreateEventRequestBody.builder()
.eventType(CreateEventRequestBodyEventType.IDENTIFY)
.body(EventBody.of(body))
.sentAt(OffsetDateTime.now())
.build();
.eventType(CreateEventRequestBodyEventType.IDENTIFY)
.body(EventBody.of(body))
.sentAt(OffsetDateTime.now())
.build();

eventBuffer.push(event);
} catch (Exception e) {
logger.error("Error sending identify event: " + e.getMessage());
}
}

public void track(String eventName, Map<String, String> company, Map<String, String> user, Map<String, Object> traits) {
public void track(
String eventName, Map<String, String> company, Map<String, String> user, Map<String, Object> traits) {
if (offline) return;

try {
EventBodyTrack body = EventBodyTrack.builder()
.event(eventName)
.company(company)
.user(user)
.traits(objectMapToJsonNode(traits))
.build();
.event(eventName)
.company(company)
.user(user)
.traits(objectMapToJsonNode(traits))
.build();

CreateEventRequestBody event = CreateEventRequestBody.builder()
.eventType(CreateEventRequestBodyEventType.TRACK)
.body(EventBody.of(body))
.sentAt(OffsetDateTime.now())
.build();
.eventType(CreateEventRequestBodyEventType.TRACK)
.body(EventBody.of(body))
.sentAt(OffsetDateTime.now())
.build();

eventBuffer.push(event);
} catch (Exception e) {
Expand Down Expand Up @@ -286,8 +285,8 @@ private Map<String, JsonNode> objectMapToJsonNode(Map<String, Object> map) {

private String serializeMap(Map<String, String> map) {
return map.entrySet().stream()
.sorted(Map.Entry.comparingByKey())
.map(e -> e.getKey() + "=" + e.getValue())
.collect(Collectors.joining(";"));
.sorted(Map.Entry.comparingByKey())
.map(e -> e.getKey() + "=" + e.getValue())
.collect(Collectors.joining(";"));
}
}
4 changes: 3 additions & 1 deletion src/main/java/com/schematic/api/cache/CacheProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

public interface CacheProvider<T> {
T get(String key);

void set(String key, T val, Duration ttlOverride);

void set(String key, T val);
}
}
2 changes: 1 addition & 1 deletion src/main/java/com/schematic/api/cache/CachedItem.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@ public void setExpiration(Instant expiration) {
public String getKey() {
return key;
}
}
}
4 changes: 2 additions & 2 deletions src/main/java/com/schematic/api/cache/LocalCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ConcurrentHashMap;
import java.util.LinkedList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;

public class LocalCache<T> implements CacheProvider<T> {
Expand Down Expand Up @@ -103,4 +103,4 @@ private void remove(String key) {
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ public void info(String message, Object... args) {
public void debug(String message, Object... args) {
System.out.println("[DEBUG] " + String.format(message, args));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

public interface SchematicLogger {
void error(String message, Object... args);

void warn(String message, Object... args);

void info(String message, Object... args);

void debug(String message, Object... args);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ public ListApiKeysResponse listApiKeys(ListApiKeysRequest request, RequestOption
httpUrl.addQueryParameter(
"environment_id", request.getEnvironmentId().get());
}
httpUrl.addQueryParameter(
"require_environment", Boolean.toString(request.getRequireEnvironment()));
httpUrl.addQueryParameter("require_environment", Boolean.toString(request.getRequireEnvironment()));
if (request.getLimit().isPresent()) {
httpUrl.addQueryParameter("limit", request.getLimit().get().toString());
}
Expand Down Expand Up @@ -363,8 +362,7 @@ public CountApiKeysResponse countApiKeys(CountApiKeysRequest request, RequestOpt
httpUrl.addQueryParameter(
"environment_id", request.getEnvironmentId().get());
}
httpUrl.addQueryParameter(
"require_environment", Boolean.toString(request.getRequireEnvironment()));
httpUrl.addQueryParameter("require_environment", Boolean.toString(request.getRequireEnvironment()));
if (request.getLimit().isPresent()) {
httpUrl.addQueryParameter("limit", request.getLimit().get().toString());
}
Expand Down
17 changes: 6 additions & 11 deletions src/test/java/com/schematic/api/TestCache.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
package com.schematic.api;

import com.schematic.api.cache.LocalCache;
import static org.junit.jupiter.api.Assertions.*;

import com.schematic.api.cache.CacheProvider;
import org.junit.jupiter.api.Test;
import com.schematic.api.cache.LocalCache;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static org.junit.jupiter.api.Assertions.*;
import org.junit.jupiter.api.Test;

class LocalCacheTest {

Expand Down Expand Up @@ -53,10 +54,7 @@ void testDefaultTTL() throws InterruptedException {

@Test
void testDefaultCapacity() {
LocalCache<Integer> cacheProvider = new LocalCache<>(
LocalCache.DEFAULT_CACHE_CAPACITY,
Duration.ofMinutes(10)
);
LocalCache<Integer> cacheProvider = new LocalCache<>(LocalCache.DEFAULT_CACHE_CAPACITY, Duration.ofMinutes(10));
String key = "test_key";

cacheProvider.set(key, -1);
Expand Down Expand Up @@ -116,10 +114,7 @@ void testConcurrentAccess() throws InterruptedException {
}

assertEquals(cacheCapacity, cacheHits.size());
assertNotEquals(
cacheCapacity,
cacheHits.get(cacheHits.size() - 1) - cacheHits.get(0) + 1
);
assertNotEquals(cacheCapacity, cacheHits.get(cacheHits.size() - 1) - cacheHits.get(0) + 1);
}

@Test
Expand Down
Loading