diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java index 2efab3e8f595..5d7346be7819 100644 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java @@ -102,6 +102,7 @@ Iterable> relevantResults( /** Return assertion to use on results of pipeline for this query. */ public SerializableFunction>, Void> assertionFor() { final Collection expectedStrings = toCollection(simulator().results()); + Assert.assertFalse(expectedStrings.isEmpty()); return new SerializableFunction>, Void>() { @Override diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6Model.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6Model.java index b5152d8a0bb8..0f625a2a91f8 100644 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6Model.java +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6Model.java @@ -21,6 +21,8 @@ import java.util.Collection; import java.util.Iterator; import java.util.Map; +import java.util.PriorityQueue; +import java.util.Queue; import java.util.TreeMap; import org.apache.beam.sdk.nexmark.NexmarkConfiguration; import org.apache.beam.sdk.nexmark.NexmarkUtils; @@ -42,17 +44,17 @@ public class Query6Model extends NexmarkQueryModel implements Serializable { * Simulator for query 6. */ private static class Simulator extends AbstractSimulator { - /** The cumulative count of winning bids, indexed by seller id. */ - private final Map numWinningBidsPerSeller; + /** The last 10 winning bids ordered by age, indexed by seller id. */ + private final Map> winningBidsPerSeller; - /** The cumulative total of winning bid prices, indexed by seller id. */ + /** The cumulative total of last 10 winning bid prices, indexed by seller id. */ private final Map totalWinningBidPricesPerSeller; private Instant lastTimestamp; public Simulator(NexmarkConfiguration configuration) { super(new WinningBidsSimulator(configuration).results()); - numWinningBidsPerSeller = new TreeMap<>(); + winningBidsPerSeller = new TreeMap<>(); totalWinningBidPricesPerSeller = new TreeMap<>(); lastTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE; } @@ -62,19 +64,24 @@ public Simulator(NexmarkConfiguration configuration) { */ private void captureWinningBid(Auction auction, Bid bid, Instant timestamp) { NexmarkUtils.info("winning auction, bid: %s, %s", auction, bid); - Long count = numWinningBidsPerSeller.get(auction.seller); - if (count == null) { - count = 1L; - } else { - count += 1; + Queue queue = winningBidsPerSeller.get(auction.seller); + if (queue == null) { + queue = new PriorityQueue(10, + (Bid b1, Bid b2) -> Long.compare(b1.dateTime, b2.dateTime)); } - numWinningBidsPerSeller.put(auction.seller, count); Long total = totalWinningBidPricesPerSeller.get(auction.seller); if (total == null) { - total = bid.price; + total = 0L; + } + int count = queue.size(); + if (count == 10) { + total -= queue.remove().price; } else { - total += bid.price; + count += 1; } + queue.add(bid); + total += bid.price; + winningBidsPerSeller.put(auction.seller, queue); totalWinningBidPricesPerSeller.put(auction.seller, total); TimestampedValue intermediateResult = TimestampedValue.of( new SellerPrice(auction.seller, Math.round((double) total / count)), timestamp); @@ -86,9 +93,9 @@ private void captureWinningBid(Auction auction, Bid bid, Instant timestamp) { protected void run() { TimestampedValue timestampedWinningBid = nextInput(); if (timestampedWinningBid == null) { - for (Map.Entry entry : numWinningBidsPerSeller.entrySet()) { + for (Map.Entry> entry : winningBidsPerSeller.entrySet()) { long seller = entry.getKey(); - long count = entry.getValue(); + long count = entry.getValue().size(); long total = totalWinningBidPricesPerSeller.get(seller); addResult(TimestampedValue.of( new SellerPrice(seller, Math.round((double) total / count)), lastTimestamp)); diff --git a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/QueryTest.java b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/QueryTest.java index 8537e02cd77e..937a727819ce 100644 --- a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/QueryTest.java +++ b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/QueryTest.java @@ -48,7 +48,7 @@ public class QueryTest { static { // careful, results of tests are linked to numEventGenerators because of timestamp generation CONFIG.numEventGenerators = 1; - CONFIG.numEvents = 1000; + CONFIG.numEvents = 5000; } @Rule public TestPipeline p = TestPipeline.create();