Orchestration Layer Issue Capturing#4167
Conversation
…sh codes - Remove explicit code parameter from emit methods; auto-generate S-prefix + 6-char SHA256 hex hash codes consistent with previous ServiceLayerLog4j2Appender convention (executor uses T prefix) - Simplify call signatures: emitFlowIssue(submitter, dagId, severity, summary) and emitJobIssue(submitter, dagId, jobName, severity, summary) - Clean up inline fully-qualified imports across all DagProc subclasses and DagProcUtils (IssueSeverity, ExceptionUtils) - Remove unused imports - Add thread safety documentation to ServiceLayerIssueEmitter Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…urce Use addAll instead of reassignment to be consistent with standard collection patterns and avoid potential issues with list reference. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Move ServiceLayerIssueEmitter call after the JOB_FAILED timer event in DagProcUtils.submitJobToExecutorInternal(). Previously, the issue event was emitted before JOB_FAILED, so when KafkaJobStatusMonitor processed the FAILED event, the error classifier would find the S-prefixed issue in the repository, fail to classify it, and produce a duplicate T0000 "ErrorCategory: UNKNOWN" issue. By emitting after JOB_FAILED, the classifier runs on an empty issue list (matching the old log interceptor behavior where issues were emitted in the finally block after the exception propagated). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
These files were inadvertently modified during the build process and should not be part of this PR. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…Emitter - Revert gobblin-yarn changes (from separate PR apache#4165) - Revert log4j dependency addition in build.gradle (no longer needed) - Trim verbose javadoc in ServiceLayerIssueEmitter - Remove debug log from emit() Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
| * Thread-safe: no shared mutable state; flow/job context passed explicitly to prevent cross-flow contamination. | ||
| */ | ||
| @Slf4j | ||
| public final class ServiceLayerIssueEmitter { |
There was a problem hiding this comment.
nit: let's rename this to OrchestratorIssueEmitter
|
can you please add unit tests for all the dag procs? |
| String jobName = this.dagNodeId != null ? this.dagNodeId.getJobName() : JobStatusRetriever.NA_KEY; | ||
|
|
||
| try ( | ||
| Closeable c1 = MDC.putCloseable(ConfigurationKeys.FLOW_GROUP_KEY, flowGroup); |
There was a problem hiding this comment.
where are these getting used?
| jobMetadata, specExecutorUri); | ||
| } | ||
|
|
||
| private static void submitJobToExecutorInternal(DagManagementStateStore dagManagementStateStore, |
There was a problem hiding this comment.
is there a need for a new function here?
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #4167 +/- ##
============================================
+ Coverage 48.36% 55.41% +7.04%
+ Complexity 8742 1624 -7118
============================================
Files 1616 311 -1305
Lines 64748 10911 -53837
Branches 7302 1100 -6202
============================================
- Hits 31317 6046 -25271
+ Misses 30645 4344 -26301
+ Partials 2786 521 -2265 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
- Revert DagProc.java to master (remove MDC/Closeable, config field) - Revert DagProcUtils.java MDC and submitJobToExecutorInternal extraction, keep only the ServiceLayerIssueEmitter.emitJobIssue call in catch block - Rewrite DagProcServiceLayerIssueIntegrationTest to test ServiceLayerIssueEmitter directly (issue codes, emit methods, null safety) - Add emitter verification tests to existing test classes: LaunchDagProcTest, KillDagProcTest, ReevaluateDagProcTest, ResumeDagProcTest, EnforceDeadlineDagProcsTest Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- EnforceDeadlineDagProcsTest: remove duplicate Dag import - ReevaluateDagProcTest: remove unused EventSubmitter import Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Rename across all 14 files (source + tests) for clarity. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
| if (queriedJobStatus.getMessage() != null) { | ||
| flowMessage = queriedJobStatus.getMessage(); | ||
| } | ||
| if (includeIssues) { |
There was a problem hiding this comment.
getIssues() returns an Optional — if it's empty this will throw NoSuchElementException. Should be:
queriedJobStatus.getIssues().ifPresent(issues ->
flowIssues.addAll(issues.stream()
.map(FlowExecutionResource::convertIssueToRestApiObject)
.collect(Collectors.toList())));
CLAUDE.md
Outdated
| Scope: work only within this directory. Do not search parent or sibling directories unless explicitly asked. | ||
|
|
||
| ## Purpose | ||
| Apache Gobblin open-source fork — LinkedIn's ETL framework (sources, converters, writers). |
| if (!isRetry && !FlowStatusGenerator.FINISHED_STATUSES.contains(executionStatus.name())) { | ||
| // this may happen if adding job status in the store failed/delayed after adding a ReevaluateDagAction in KafkaJobStatusMonitor | ||
| throw new RuntimeException(String.format("Job status for dagNode %s is %s. Re-evaluate dag action should have been " | ||
| String message = String.format("Job status for dagNode %s is %s. Re-evaluate dag action should have been " |
There was a problem hiding this comment.
is this variable needed?
There was a problem hiding this comment.
it's used by both emitJobIssue and throw new RuntimeException(message), so extracted it
| if (jobStatus.isShouldRetry()) { | ||
| log.info("Retrying job: {}, current attempts: {}, max attempts: {}", | ||
| DagUtils.getFullyQualifiedJobName(dagNode), jobStatus.getCurrentAttempts(), jobStatus.getMaxAttempts()); | ||
| // todo - be careful when unsetting this, it is possible that this is set to FAILED because some other job in the |
There was a problem hiding this comment.
Should we emit a WARN issue from here about job being retried?
- FlowExecutionResource: add null check for getIssues() supplier before calling .get() to prevent NPE - ReevaluateDagProc: emit WARN issue when job is being retried with attempt count info - Remove accidentally committed CLAUDE.md Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Dear Gobblin maintainers,
Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
JIRA
Description
This PR adds explicit issue capturing at the orchestration/service layer in Gobblin. Previously, issues were only captured at the executor/job level (via
AutoTroubleshooterLogAppender), leaving orchestration errors (compilation failures,SLA violations, job submission failures) invisible to users.
What changed:
New
ServiceLayerIssueEmitterutility — emits issues through the existingIssueEventBuilder→ Kafka →JobIssueEventHandler→MultiContextIssueRepositorypipeline. Issue codes useSprefix + 6-char hex hash, consistent with theexecutor-side
Tprefix convention.Instrumented 18 orchestration error points across all
DagProcsubclasses andFlowCompilationValidationHelper:Added
issuesarray toFlowExecution.pdl— surfaces flow-level issues in the REST API response for cases wherejobStatusesis empty (e.g., compilation failures). Previously, issues stored against the NA pseudo-job context werenever included in the API response.
Updated
FlowExecutionResource.convertFlowStatus()— extracts issues from the NA pseudo-job and populates the new flow-levelissuesarray.Backward compatibility:
AutoTroubleshooterLogAppender) is completely untouchedjobStatuses[i].issues[]issuesfield onFlowExecutiondefaults to[](Rest.li compatible:[MD-COMPAT]: true)Thread safety:
ServiceLayerIssueEmitterhas no shared mutable state — flow/job context is passed explicitly (not read from MDC), preventing cross-flow contaminationTests
DagProcServiceLayerIssueIntegrationTest.java— verifies MDC context isolation, issue attribution, and context ID format consistencyCommits