diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java index 5d7346be7819..2efab3e8f595 100644 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java @@ -102,7 +102,6 @@ Iterable> relevantResults( /** Return assertion to use on results of pipeline for this query. */ public SerializableFunction>, Void> assertionFor() { final Collection expectedStrings = toCollection(simulator().results()); - Assert.assertFalse(expectedStrings.isEmpty()); return new SerializableFunction>, Void>() { @Override diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6Model.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6Model.java index 0f625a2a91f8..b5152d8a0bb8 100644 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6Model.java +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6Model.java @@ -21,8 +21,6 @@ import java.util.Collection; import java.util.Iterator; import java.util.Map; -import java.util.PriorityQueue; -import java.util.Queue; import java.util.TreeMap; import org.apache.beam.sdk.nexmark.NexmarkConfiguration; import org.apache.beam.sdk.nexmark.NexmarkUtils; @@ -44,17 +42,17 @@ public class Query6Model extends NexmarkQueryModel implements Serializable { * Simulator for query 6. */ private static class Simulator extends AbstractSimulator { - /** The last 10 winning bids ordered by age, indexed by seller id. */ - private final Map> winningBidsPerSeller; + /** The cumulative count of winning bids, indexed by seller id. */ + private final Map numWinningBidsPerSeller; - /** The cumulative total of last 10 winning bid prices, indexed by seller id. */ + /** The cumulative total of winning bid prices, indexed by seller id. */ private final Map totalWinningBidPricesPerSeller; private Instant lastTimestamp; public Simulator(NexmarkConfiguration configuration) { super(new WinningBidsSimulator(configuration).results()); - winningBidsPerSeller = new TreeMap<>(); + numWinningBidsPerSeller = new TreeMap<>(); totalWinningBidPricesPerSeller = new TreeMap<>(); lastTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE; } @@ -64,24 +62,19 @@ public Simulator(NexmarkConfiguration configuration) { */ private void captureWinningBid(Auction auction, Bid bid, Instant timestamp) { NexmarkUtils.info("winning auction, bid: %s, %s", auction, bid); - Queue queue = winningBidsPerSeller.get(auction.seller); - if (queue == null) { - queue = new PriorityQueue(10, - (Bid b1, Bid b2) -> Long.compare(b1.dateTime, b2.dateTime)); + Long count = numWinningBidsPerSeller.get(auction.seller); + if (count == null) { + count = 1L; + } else { + count += 1; } + numWinningBidsPerSeller.put(auction.seller, count); Long total = totalWinningBidPricesPerSeller.get(auction.seller); if (total == null) { - total = 0L; - } - int count = queue.size(); - if (count == 10) { - total -= queue.remove().price; + total = bid.price; } else { - count += 1; + total += bid.price; } - queue.add(bid); - total += bid.price; - winningBidsPerSeller.put(auction.seller, queue); totalWinningBidPricesPerSeller.put(auction.seller, total); TimestampedValue intermediateResult = TimestampedValue.of( new SellerPrice(auction.seller, Math.round((double) total / count)), timestamp); @@ -93,9 +86,9 @@ private void captureWinningBid(Auction auction, Bid bid, Instant timestamp) { protected void run() { TimestampedValue timestampedWinningBid = nextInput(); if (timestampedWinningBid == null) { - for (Map.Entry> entry : winningBidsPerSeller.entrySet()) { + for (Map.Entry entry : numWinningBidsPerSeller.entrySet()) { long seller = entry.getKey(); - long count = entry.getValue().size(); + long count = entry.getValue(); long total = totalWinningBidPricesPerSeller.get(seller); addResult(TimestampedValue.of( new SellerPrice(seller, Math.round((double) total / count)), lastTimestamp)); diff --git a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/QueryTest.java b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/QueryTest.java index 937a727819ce..8537e02cd77e 100644 --- a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/QueryTest.java +++ b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/QueryTest.java @@ -48,7 +48,7 @@ public class QueryTest { static { // careful, results of tests are linked to numEventGenerators because of timestamp generation CONFIG.numEventGenerators = 1; - CONFIG.numEvents = 5000; + CONFIG.numEvents = 1000; } @Rule public TestPipeline p = TestPipeline.create();