Skip to content

Commit

Permalink
Merge pull request #15 from gdemengin/unallocated
Browse files Browse the repository at this point in the history
fix parsing of running pipeline flowgraph while some nodes are still unallocated
  • Loading branch information
gdemengin authored Apr 18, 2022
2 parents ad0d662 + 940933b commit 42905f2
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 117 deletions.
4 changes: 2 additions & 2 deletions .github/jenkins-lts/jenkins-home/jobs/logparser/config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<org.jenkinsci.plugins.workflow.job.properties.PipelineTriggersJobProperty>
<triggers>
<hudson.triggers.SCMTrigger>
<spec>H/5 * * * *</spec>
<spec>* * * * *</spec>
<ignorePostCommitHooks>false</ignorePostCommitHooks>
</hudson.triggers.SCMTrigger>
</triggers>
Expand All @@ -14,7 +14,7 @@
<configVersion>2</configVersion>
<userRemoteConfigs>
<hudson.plugins.git.UserRemoteConfig>
<url>file://${GITHUB_WORKSPACE}/.git</url>
<url>file://${GITHUB_WORKSPACE}/.tmp-test/.git</url>
</hudson.plugins.git.UserRemoteConfig>
</userRemoteConfigs>
<branches>
Expand Down
4 changes: 3 additions & 1 deletion .github/jenkins-lts/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

set -e

cd $(dirname $0)

docker build -t jenkins-lts .
export GITHUB_WORKSPACE=/workspace
export GITHUB_SHA=$(git rev-parse --verify HEAD)

docker run -it --rm --name jenkins-lts -p 8080:8080 -e GITHUB_SHA -e GITHUB_WORKSPACE -v "$(pwd -P)/../../":"/workspace" -v "/var/run/docker.sock":"/var/run/docker.sock" -it jenkins-lts
docker run -it --rm -e GITHUB_SHA -e GITHUB_WORKSPACE -v "$(pwd -P)/../../":"/workspace" -it jenkins-lts
21 changes: 21 additions & 0 deletions Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -1147,11 +1147,32 @@ def testManyThreads(nbthread, nbloop, nbsubloop) {
}
}

// if more threads than executor version 3.1.1 fails
def testThreadsWithNodes(label, nbthread) {
torun = [:]
nbthread.times {
torun["${it}"] = {
node(label) {
logparser.getLogsWithBranchInfo([ filter : ["${it}"] ])
}
}
}
stage("test ${nbthread} threads with node()") {
timestamps {
parallel torun
}
}
}

// ===============
// = run tests =
// ===============

testLogparser()
// test with less nodes as executor
testThreadsWithNodes(LABEL_LINUX, 2)
// same with more than executors available
testThreadsWithNodes(LABEL_LINUX, 20)
if (RUN_MANYTHREAD_TIMING_TEST) {
testManyThreads(50,20,500)
}
9 changes: 6 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ Compatibility:
### import pipeline-logparser library
in Jenkinsfile import library like this
```
@Library('pipeline-logparser@3.1.1') _
@Library('pipeline-logparser@3.1.2') _
```
_identifier "pipeline-logparser" is the name of the library set by jenkins administrator in instance configuration:_
* _it may be different on your instance_
Expand All @@ -41,7 +41,7 @@ def mylog = logparser.getLogsWithBranchInfo()

### Detailed Documentation

see online documentation here: [logparser.txt](https://htmlpreview.github.io/?https://github.com/gdemengin/pipeline-logparser/blob/3.1.1/vars/logparser.txt)
see online documentation here: [logparser.txt](https://htmlpreview.github.io/?https://github.com/gdemengin/pipeline-logparser/blob/3.1.2/vars/logparser.txt)
* _also available in $JOB_URL/pipeline-syntax/globals#logparser_
* _visible only after the library has been imported once_
* _requires configuring 'Markup Formater' as 'Safe HTML' in $JENKINS_URL/configureSecurity_
Expand Down Expand Up @@ -606,4 +606,7 @@ Note:
- add host in getPipelineStepsUrl for node/agent steps

* 3.1.1 (04/2022)
- speed optimisation (cf #13)
- speed optimisation

* 3.1.2 (04/2022)
- fix issue when parsing logs while some node step is still searching for a host to allocate
223 changes: 112 additions & 111 deletions vars/logparser.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,6 @@ def cachedTree = [:]
// **********************
// * INTERNAL FUNCTIONS *
// **********************
@NonCPS
org.jenkinsci.plugins.workflow.job.views.FlowGraphAction _getFlowGraphAction(build) {
def flowGraph = build.rawBuild.allActions.findAll { it.class == org.jenkinsci.plugins.workflow.job.views.FlowGraphAction }
assert flowGraph.size() == 1
return flowGraph[0]
}

@NonCPS
org.jenkinsci.plugins.workflow.graph.FlowNode _getNode(flowGraph, id) {
def node = flowGraph.nodes.findAll{ it.id == id }
assert node.size() == 1
node = node[0]
}

@NonCPS
org.jenkinsci.plugins.workflow.actions.LogAction _getLogAction(node) {
def logaction = \
Expand All @@ -49,116 +35,132 @@ org.jenkinsci.plugins.workflow.actions.LogAction _getLogAction(node) {
return logaction[0]
}

// expose flowGraphAction as node id map
// with list of children cached to speed-up
// and active status to avoid inconsistencies with children list
@NonCPS
java.util.LinkedHashMap _getChildrenMap(_flowGraph) {
java.util.LinkedHashMap childrenMap = [:]
_flowGraph.nodes.each {
List parentNodeChildren = childrenMap.get(it.enclosingId, [])
parentNodeChildren.add(it)
java.util.LinkedHashMap _getFlowGraphMap(build) {
def flowGraph = build.rawBuild.allActions.findAll { it.class == org.jenkinsci.plugins.workflow.job.views.FlowGraphAction }
assert flowGraph.size() == 1
flowGraph = flowGraph[0]

// init map with copy of active status
// to avoid incomplete list of children if state is changing from active to inactive
// (once inactive, nodes & their children are not updated if cachedTree is updated)
def flowGraphMap = flowGraph.nodes.collectEntries {
[
(it.id): [
node: it,
active: it.active == true,
children: []
]
]
}
return childrenMap

// cache children to speed-up
// get children AFTER active state
def start = null
flowGraph.nodes.each {
if (it.enclosingId != null) {
flowGraphMap[it.enclosingId].children.add(it)
flowGraphMap[it.id] += _getNodeInfos(it)
}
else {
assert it.class == org.jenkinsci.plugins.workflow.graph.FlowStartNode
assert start == null
start = it.id
flowGraphMap[it.id].isBranch = true
}
}

return [start: start, map: flowGraphMap]
}

@NonCPS
java.util.LinkedHashMap _getNodeTree(build, _flowGraph = null, _node = null, _branches=[], _stages=[], _childrenMap = null) {
java.util.LinkedHashMap _getNodeInfos(node) {
def infos = [:]
if (node.class == org.jenkinsci.plugins.workflow.cps.nodes.StepStartNode) {
if (node.descriptor instanceof org.jenkinsci.plugins.workflow.cps.steps.ParallelStep$DescriptorImpl) {
def labelAction = node.actions.findAll { it.class == org.jenkinsci.plugins.workflow.cps.steps.ParallelStepExecution$ParallelLabelAction }
assert labelAction.size() == 1 || labelAction.size() == 0
if (labelAction.size() == 1) {
infos += [ name: labelAction[0].threadName, isBranch: true ]
}
} else if (node.descriptor instanceof org.jenkinsci.plugins.workflow.support.steps.StageStep$DescriptorImpl) {
def labelAction = node.actions.findAll { it.class == org.jenkinsci.plugins.workflow.actions.LabelAction }
assert labelAction.size() == 1 || labelAction.size() == 0
if (labelAction.size() == 1) {
infos += [ name: labelAction[0].displayName, isStage: true, isBranch: true ]
}
} else if (node.descriptor instanceof org.jenkinsci.plugins.workflow.support.steps.ExecutorStep$DescriptorImpl && node.displayName=='Allocate node : Start') {
def argAction = node.actions.findAll { it.class == org.jenkinsci.plugins.workflow.cps.actions.ArgumentsActionImpl }
assert argAction.size() == 1 || argAction.size() == 0
// record the label if any
if (argAction.size() == 1 && argAction[0].unmodifiedArguments) {
infos += [ label: argAction[0].argumentsInternal.label ]
}

def wsAction = node.actions.findAll { it.class == org.jenkinsci.plugins.workflow.support.actions.WorkspaceActionImpl }
// hostname may be missing if host not yet allocated
assert wsAction.size() == 1 || wsAction.size() == 0
// record hostname if any
if (wsAction.size() == 1) {
infos += [ hostname: wsAction[0].node ]
}
}
}
return infos
}

@NonCPS
java.util.LinkedHashMap _getNodeTree(build, _flowGraphMap = null, _node = null) {
def key=build.getFullDisplayName()
if (this.cachedTree.containsKey(key) == false) {
this.cachedTree[key] = [:]
}

def flowGraph = _flowGraph
if (flowGraph == null) {
flowGraph = _getFlowGraphAction(build)
def flowGraphMap = _flowGraphMap
if (flowGraphMap == null) {
flowGraphMap = _getFlowGraphMap(build)
}
def childrenMap = _childrenMap
if (_childrenMap == null) {
childrenMap = _getChildrenMap(flowGraph)

if (flowGraphMap.map.size() == 0) {
// pipeline not yet started, or failed before start
assert _node == null
return [:]
}
def node = _node
def name = null
def stage = false
def branches = _branches.collect{ it }
def stages = _stages.collect { it }
def label = null
def host = null

if (node == null || this.cachedTree[key].containsKey(node.id) == false || this.cachedTree[key][node.id].active) {
// fill in branches and stages lists for children (root branch + named branches/stages only)
if (node == null) {
if (flowGraph.nodes.size() == 0) {
// pipeline not yet started, or failed before start
return [:]
}
def rootNode = flowGraph.nodes.findAll{ it.enclosingId == null && it.class == org.jenkinsci.plugins.workflow.graph.FlowStartNode }
assert rootNode.size() == 1
node = rootNode[0]
branches += [ node.id ]
} else if (node.class == org.jenkinsci.plugins.workflow.cps.nodes.StepStartNode) {
if (node.descriptor instanceof org.jenkinsci.plugins.workflow.cps.steps.ParallelStep$DescriptorImpl) {
def labelAction = node.actions.findAll { it.class == org.jenkinsci.plugins.workflow.cps.steps.ParallelStepExecution$ParallelLabelAction }
assert labelAction.size() == 1 || labelAction.size() == 0
if (labelAction.size() == 1) {
name = labelAction[0].threadName
branches.add(0, node.id)
}
} else if (node.descriptor instanceof org.jenkinsci.plugins.workflow.support.steps.StageStep$DescriptorImpl) {
def labelAction = node.actions.findAll { it.class == org.jenkinsci.plugins.workflow.actions.LabelAction }
assert labelAction.size() == 1 || labelAction.size() == 0
if (labelAction.size() == 1) {
name = labelAction[0].displayName
stage = true
branches.add(0, node.id)
stages.add(0, node.id)
}
} else if (node.descriptor instanceof org.jenkinsci.plugins.workflow.support.steps.ExecutorStep$DescriptorImpl && node.displayName=='Allocate node : Start') {
def argAction = node.actions.findAll { it.class == org.jenkinsci.plugins.workflow.cps.actions.ArgumentsActionImpl }
assert argAction.size() == 1 || argAction.size() == 0
// record the label if any
if (argAction.size() == 1 && argAction[0].unmodifiedArguments) {
label=argAction[0].argumentsInternal.label
}

// record the hostname
def wsAction = node.actions.findAll { it.class == org.jenkinsci.plugins.workflow.support.actions.WorkspaceActionImpl }
assert wsAction.size() == 1
host=wsAction[0].node
}
}
def node = _node
if (node == null) {
node = flowGraphMap.map[flowGraphMap.start].node
}

// add node information in tree
// get active state first
def active = node.isActive() == true
// get children AFTER active state (avoid incomplete list if state was still active)
def children = childrenMap.getOrDefault(node.id, []).sort{ Integer.parseInt("${it.id}") }
def logaction = _getLogAction(node)
// add current node to cache if not already there
// or update it, if it was still active in cache (and possibly incomplete)
if (this.cachedTree[key].containsKey(node.id) == false || this.cachedTree[key][node.id].active) {
def children = flowGraphMap.map[node.id].children.sort{ Integer.parseInt("${it.id}") }

// add parent in tree first
if (this.cachedTree[key].containsKey(node.id) == false) {
this.cachedTree[key][node.id] = [ \
id: node.id,
name: name,
stage: stage,
parents: node.allEnclosingIds,
parent: node.enclosingId,
children: children.collect{ it.id },
branches: _branches,
stages: _stages,
active: active,
haslog: logaction != null,
displayFunctionName: node.displayFunctionName,
url: node.url,
label: label,
host: host
]
} else {
// node exist in cached tree but was active last time it was updated: refresh its children and status
this.cachedTree[key][node.id].active = active
this.cachedTree[key][node.id].children = children.collect{ it.id }
this.cachedTree[key][node.id].haslog = logaction != null
}
this.cachedTree[key][node.id] = [ \
id: node.id,
name: flowGraphMap.map[node.id].name,
stage: flowGraphMap.map[node.id].isStage == true,
parents: node.allEnclosingIds,
parent: node.enclosingId,
children: children.collect{ it.id },
branches: node.allEnclosingIds.findAll{ flowGraphMap.map[it].isBranch },
stages: node.allEnclosingIds.findAll{ flowGraphMap.map[it].isStage },
active: flowGraphMap.map[node.id].active == true,
haslog: _getLogAction(node) != null,
displayFunctionName: node.displayFunctionName,
url: node.url,
label: flowGraphMap.map[node.id].label,
host: flowGraphMap.map[node.id].hostname
]

// then add children
children.each{
_getNodeTree(build, flowGraph, it, branches, stages, childrenMap)
_getNodeTree(build, flowGraphMap, it)
}
}
// else : node was already put in tree while inactive, nothing to update
Expand Down Expand Up @@ -349,8 +351,8 @@ String getLogsWithBranchInfo(java.util.LinkedHashMap options = [:], build = curr
}
*/

def flowGraph = _getFlowGraphAction(build)
def tree = _getNodeTree(build, flowGraph)
def flowGraphMap = _getFlowGraphMap(build)
def tree = _getNodeTree(build, flowGraphMap)

if (this.verbose) {
print "tree=${tree}"
Expand Down Expand Up @@ -380,7 +382,7 @@ String getLogsWithBranchInfo(java.util.LinkedHashMap options = [:], build = curr
}

if (it.haslog) {
def node = _getNode(flowGraph, it.id)
def node = flowGraphMap.map[it.id].node
def logaction = _getLogAction(node)
assert logaction != null

Expand Down Expand Up @@ -441,8 +443,7 @@ java.util.ArrayList getBranches(java.util.LinkedHashMap options = [:], build = c
// 1/ parse options
def opt = _parseOptions(options)

def flowGraph = _getFlowGraphAction(build)
def tree = _getNodeTree(build, flowGraph)
def tree = _getNodeTree(build)

if (this.verbose) {
print "tree=${tree}"
Expand Down

0 comments on commit 42905f2

Please sign in to comment.