diff --git a/.github/workflows/beam_PreCommit_Java_PVR_Prism_Loopback.yml b/.github/workflows/beam_PreCommit_Java_PVR_Prism_Loopback.yml index 2b1daef3abd9..0d811bd59d3c 100644 --- a/.github/workflows/beam_PreCommit_Java_PVR_Prism_Loopback.yml +++ b/.github/workflows/beam_PreCommit_Java_PVR_Prism_Loopback.yml @@ -71,6 +71,22 @@ env: DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }} GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }} GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }} + GRPC_ARG_KEEPALIVE_TIME_MS: "30000" + GRPC_ARG_KEEPALIVE_TIMEOUT_MS: "5000" + GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA: "0" + GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS: "1" + GRPC_ARG_MAX_RECONNECT_BACKOFF_MS: "120000" + GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS: "1000" + GRPC_ARG_MAX_CONNECTION_IDLE_MS: "300000" + GRPC_ARG_MAX_CONNECTION_AGE_MS: "1800000" + BEAM_RETRY_MAX_ATTEMPTS: "5" + BEAM_RETRY_INITIAL_DELAY_MS: "1000" + BEAM_RETRY_MAX_DELAY_MS: "60000" + BEAM_RUNNER_BUNDLE_TIMEOUT_MS: "300000" + BEAM_TESTING_FORCE_SINGLE_BUNDLE: "true" + BEAM_TESTING_DETERMINISTIC_ORDER: "true" + BEAM_SDK_WORKER_PARALLELISM: "1" + BEAM_WORKER_POOL_SIZE: "1" jobs: beam_PreCommit_Java_PVR_Prism_Loopback: diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index de7b89e751ec..661c7929ce85 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -225,6 +225,7 @@ type ElementManager struct { sideConsumers map[string][]LinkID // Map from pcollectionID to the stage+transform+input that consumes them as side input. pcolParents map[string]string // Map from pcollectionID to stageIDs that produce the pcollection. + pcolInfo map[string]PColInfo // Map from pcollectionID to PColInfo for key extraction. refreshCond sync.Cond // refreshCond protects the following fields with it's lock, and unblocks bundle scheduling. inprogressBundles set[string] // Active bundleIDs @@ -255,6 +256,7 @@ func NewElementManager(config Config) *ElementManager { consumers: map[string][]string{}, sideConsumers: map[string][]LinkID{}, pcolParents: map[string]string{}, + pcolInfo: map[string]PColInfo{}, changedStages: set[string]{}, inprogressBundles: set[string]{}, refreshCond: sync.Cond{L: &sync.Mutex{}}, @@ -324,6 +326,10 @@ func (em *ElementManager) StageProcessingTimeTimers(ID string, ptTimers map[stri em.stages[ID].processingTimeTimersFamilies = ptTimers } +func (em *ElementManager) RegisterPColInfo(pcolID string, info PColInfo) { + em.pcolInfo[pcolID] = info +} + // AddTestStream provides a builder interface for the execution layer to build the test stream from // the protos. func (em *ElementManager) AddTestStream(id string, tagToPCol map[string]string) TestStreamBuilder { diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go b/sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go index 90f81d3104b7..cc731920b82d 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go @@ -16,6 +16,8 @@ package engine import ( + "bytes" + "fmt" "log/slog" "time" @@ -173,18 +175,40 @@ type tsElementEvent struct { // Execute this ElementEvent by routing pending element to their consuming stages. func (ev tsElementEvent) Execute(em *ElementManager) { t := em.testStreamHandler.tagState[ev.Tag] + if t.pcollection == "" { + panic(fmt.Sprintf("TestStream tag %q not found in tagState", ev.Tag)) + } + info, ok := em.pcolInfo[t.pcollection] + if !ok { + panic(fmt.Sprintf("PColInfo not registered for TestStream output PCollection %q (tag %q)", t.pcollection, ev.Tag)) + } var pending []element for _, e := range ev.Elements { + if len(e.Encoded) == 0 { + panic(fmt.Sprintf("TestStream: empty encoded element for tag %q", ev.Tag)) + } + buf := bytes.NewBuffer(e.Encoded) + elmBytes := info.EDec(buf) + if len(elmBytes) == 0 { + panic(fmt.Sprintf("TestStream: decoded element bytes are empty for tag %q, encoded length: %d", ev.Tag, len(e.Encoded))) + } + + var keyBytes []byte + if info.KeyDec != nil { + kbuf := bytes.NewBuffer(elmBytes) + keyBytes = info.KeyDec(kbuf) + } + pending = append(pending, element{ window: window.GlobalWindow{}, timestamp: e.EventTime, - elmBytes: e.Encoded, + elmBytes: elmBytes, + keyBytes: keyBytes, pane: typex.NoFiringPane(), }) } - // Update the consuming state. for _, sID := range em.consumers[t.pcollection] { ss := em.stages[sID] added := ss.AddPending(em, pending) diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go b/sdks/go/pkg/beam/runners/prism/internal/execute.go index 05e939411b05..853b7974479d 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go @@ -277,6 +277,10 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic // Add a synthetic stage that should largely be unused. em.AddStage(stage.ID, nil, maps.Values(t.GetOutputs()), nil) + for pcolID, info := range stage.OutputsToCoders { + em.RegisterPColInfo(pcolID, info) + } + // Decode the test stream, and convert it to the various events for the ElementManager. var pyld pipepb.TestStreamPayload if err := proto.Unmarshal(t.GetSpec().GetPayload(), &pyld); err != nil { diff --git a/settings.gradle.kts b/settings.gradle.kts index f91951c81896..d9faf789f345 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import com.gradle.enterprise.gradleplugin.internal.extension.BuildScanExtensionWithHiddenFeatures pluginManagement { plugins {