Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix UDP template expiry in collector process #381

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
k8s.io/apimachinery v0.31.0
k8s.io/component-base v0.31.0
k8s.io/klog/v2 v2.130.1
k8s.io/utils v0.0.0-20240902221715-702e33fdd3c3
)

require (
Expand Down Expand Up @@ -62,7 +63,6 @@ require (
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/utils v0.0.0-20240902221715-702e33fdd3c3 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
)
Expand Down
117 changes: 89 additions & 28 deletions pkg/collector/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"time"

"k8s.io/klog/v2"
"k8s.io/utils/clock"

"github.com/vmware/go-ipfix/pkg/entities"
"github.com/vmware/go-ipfix/pkg/registry"
Expand All @@ -50,9 +51,15 @@
DecodingModeLenientDropUnknown DecodingMode = "LenientDropUnknown"
)

type template struct {
ies []*entities.InfoElement
expiryTime time.Time
expiryTimer clock.Timer
}

type CollectingProcess struct {
// for each obsDomainID, there is a map of templates
templatesMap map[uint32]map[uint16][]*entities.InfoElement
templatesMap map[uint32]map[uint16]*template
// mutex allows multiple readers or one writer at the same time
mutex sync.RWMutex
// template lifetime
Expand Down Expand Up @@ -85,6 +92,8 @@
serverKey []byte
wg sync.WaitGroup
numOfRecordsReceived uint64
// clock implementation: enables injecting a fake clock for testing
clock clock.WithDelayedExecution
}

type CollectorInput struct {
Expand Down Expand Up @@ -113,7 +122,7 @@
closeClientChan chan struct{}
}

func InitCollectingProcess(input CollectorInput) (*CollectingProcess, error) {
func initCollectingProcess(input CollectorInput, clock clock.WithDelayedExecution) (*CollectingProcess, error) {
templateTTLSeconds := input.TemplateTTL
if input.Protocol == "udp" && templateTTLSeconds == 0 {
templateTTLSeconds = entities.TemplateTTL
Expand All @@ -128,8 +137,8 @@
"encrypted", input.IsEncrypted, "address", input.Address, "protocol", input.Protocol, "maxBufferSize", input.MaxBufferSize,
"templateTTL", templateTTL, "numExtraElements", input.NumExtraElements, "decodingMode", decodingMode,
)
collectProc := &CollectingProcess{
templatesMap: make(map[uint32]map[uint16][]*entities.InfoElement),
cp := &CollectingProcess{
templatesMap: make(map[uint32]map[uint16]*template),
mutex: sync.RWMutex{},
templateTTL: templateTTL,
address: input.Address,
Expand All @@ -144,8 +153,13 @@
serverKey: input.ServerKey,
numExtraElements: input.NumExtraElements,
decodingMode: decodingMode,
clock: clock,
}
return collectProc, nil
return cp, nil
}

func InitCollectingProcess(input CollectorInput) (*CollectingProcess, error) {
return initCollectingProcess(input, clock.RealClock{})
}

func (cp *CollectingProcess) Start() {
Expand Down Expand Up @@ -321,7 +335,7 @@

func (cp *CollectingProcess) decodeDataSet(dataBuffer *bytes.Buffer, obsDomainID uint32, templateID uint16) (entities.Set, error) {
// make sure template exists
template, err := cp.getTemplate(obsDomainID, templateID)
template, err := cp.getTemplateIEs(obsDomainID, templateID)
if err != nil {
return nil, fmt.Errorf("template %d with obsDomainID %d does not exist", templateID, obsDomainID)
}
Expand Down Expand Up @@ -361,47 +375,94 @@
func (cp *CollectingProcess) addTemplate(obsDomainID uint32, templateID uint16, elementsWithValue []entities.InfoElementWithValue) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
if _, exists := cp.templatesMap[obsDomainID]; !exists {
cp.templatesMap[obsDomainID] = make(map[uint16][]*entities.InfoElement)
if _, ok := cp.templatesMap[obsDomainID]; !ok {
cp.templatesMap[obsDomainID] = make(map[uint16]*template)
}
elements := make([]*entities.InfoElement, 0)
for _, elementWithValue := range elementsWithValue {
elements = append(elements, elementWithValue.GetInfoElement())
}
cp.templatesMap[obsDomainID][templateID] = elements
// template lifetime management
tpl, ok := cp.templatesMap[obsDomainID][templateID]
if !ok {
tpl = &template{}
cp.templatesMap[obsDomainID][templateID] = tpl
}
tpl.ies = elements
klog.V(4).InfoS("Added template to template map", "obsDomainID", obsDomainID, "templateID", templateID)
// Template lifetime management for UDP.
if cp.protocol != "udp" {
return
}
// Handle udp template expiration
go func() {
ticker := time.NewTicker(cp.templateTTL)
defer ticker.Stop()
select {
case <-ticker.C:
tpl.expiryTime = cp.clock.Now().Add(cp.templateTTL)
if tpl.expiryTimer == nil {
tpl.expiryTimer = cp.clock.AfterFunc(cp.templateTTL, func() {
klog.Infof("Template with id %d, and obsDomainID %d is expired.", templateID, obsDomainID)
cp.deleteTemplate(obsDomainID, templateID)
break
now := cp.clock.Now()
// From the Go documentation:
// For a func-based timer created with AfterFunc(d, f), Reset either
// reschedules when f will run, in which case Reset returns true, or
// schedules f to run again, in which case it returns false. When Reset
// returns false, Reset neither waits for the prior f to complete before
// returning nor does it guarantee that the subsequent goroutine running f
// does not run concurrently with the prior one. If the caller needs to
// know whether the prior execution of f is completed, it must coordinate
// with f explicitly.
// In our case, when f executes, we have to verify that the record is indeed
// scheduled for deletion by checking expiryTime. We cannot just
// automatically delete the template.

// No reason to try to stop the timer in this case, even though it would be
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to follow the logic, is it because the expiryTimer is already nil, so there's no point to set stopTimer to true? Because I was thinking about L444, wondering if there's a case when stopTimer is true but expiryTimer is nil.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this code executes as part of the function invoked by the timer. Broadly speaking, if the function has been invoked by the timer, then the timer has already fired and there is no need to stop it. There are some edge cases with AfterFunc which are described in the documentation snippet above.
Essentially there are 2 cases:

  • the timer fires because we need to expire the template. This is the "normal" case. The timer has fired and there is no need to stop it.
  • the timer fires, but there is a concurrent call to addTemplate (and to AfterFunc). This is the edge case described in the doc. We rely on the condition below (!tpl.expiryTime.After(now)) to determine whether we should just ignore this firing of the timer. The synchronization provided by cp.mutex is very important here. We will wait until the call to addTemplate returns and releases the lock. If the condition is false (which will be the case), we will not call expiryTimer.Stop() in deleteTemplateWithConds (even if stopTimer was set to true), as the template entry has been updated and the current run of the timer function should be ignored and be a no-op.

Calling expiryTimer.Stop() even if the timer has already been stopped is harmless. But it doesn't work well with the fake clock unfortunately. If the fake clock implementation is fixed, I may remove stopTimer.

The nil check at line L444 is more for defensive programming and for the TCP case.

// technically correct, so we pass false for stopTimer.
cp.deleteTemplateWithConds(obsDomainID, templateID, false, func(tpl *template) bool {
// lock will be held when this executes
return !tpl.expiryTime.After(now)
})
})
} else {
tpl.expiryTimer.Reset(cp.templateTTL)
}
}

// deleteTemplate returns true iff a template was actually deleted.
func (cp *CollectingProcess) deleteTemplate(obsDomainID uint32, templateID uint16) bool {
return cp.deleteTemplateWithConds(obsDomainID, templateID, true)
}

// deleteTemplateWithConds returns true iff a template was actually deleted.
func (cp *CollectingProcess) deleteTemplateWithConds(obsDomainID uint32, templateID uint16, stopTimer bool, condFns ...func(*template) bool) bool {
cp.mutex.Lock()
defer cp.mutex.Unlock()
template, ok := cp.templatesMap[obsDomainID][templateID]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't the templateID be deleted before, and the timer cannot be stopped when we can't find the templateID in templatesMap?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which case / code path are you referring to?
the only case where stopTimer should be false is if the caller knows for certain than the timer has already expired (taking into account that condFns should all be true).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, when we remove the templateID from a templatesMap, shouldn't we call the expiryTimer.Stop()? otherwise; wouldn't expiryTimer keep running?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also remind me where we call deleteTemplate? If we call deleteTemplate after a template is expired, deleteTemplate will return false and the expiryTimer. Stop () won't be called?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, when we remove the templateID from a templatesMap, shouldn't we call the expiryTimer.Stop()? otherwise; wouldn't expiryTimer keep running?

We do this a few lines below:

	if stopTimer && template.expiryTimer != nil {
		template.expiryTimer.Stop()
	}

Could you also remind me where we call deleteTemplate? If we call deleteTemplate after a template is expired, deleteTemplate will return false and the expiryTimer. Stop () won't be called?

deleteTemplate is not called right now actually. I have a follow up patch which deletes templates when TCP connections are closed, but I wanted to split PRs to keep them small. Sorry for the confusion. At the moment we call deleteTemplateWithConds when expiring a template. During this call, we do not need to Stop the timer, as we are guaranteed that the timer has already fired (see comments in addTemplate).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation. So stopTimer is mainly for TCP when we want to delete the template before it is expired.

if !ok {
return false
}
for _, condFn := range condFns {
if !condFn(template) {
return false

Check warning on line 441 in pkg/collector/process.go

View check run for this annotation

Codecov / codecov/patch

pkg/collector/process.go#L441

Added line #L441 was not covered by tests
}
}()
}
if stopTimer && template.expiryTimer != nil {
template.expiryTimer.Stop()
}
delete(cp.templatesMap[obsDomainID], templateID)
klog.V(4).InfoS("Deleted template from template map", "obsDomainID", obsDomainID, "templateID", templateID)
if len(cp.templatesMap[obsDomainID]) == 0 {
delete(cp.templatesMap, obsDomainID)
klog.V(4).InfoS("No more templates for observation domain", "obsDomainID", obsDomainID)
}
return true
}

func (cp *CollectingProcess) getTemplate(obsDomainID uint32, templateID uint16) ([]*entities.InfoElement, error) {
func (cp *CollectingProcess) getTemplateIEs(obsDomainID uint32, templateID uint16) ([]*entities.InfoElement, error) {
cp.mutex.RLock()
defer cp.mutex.RUnlock()
if elements, exists := cp.templatesMap[obsDomainID][templateID]; exists {
return elements, nil
if template, ok := cp.templatesMap[obsDomainID][templateID]; ok {
return template.ies, nil
} else {
return nil, fmt.Errorf("template %d with obsDomainID %d does not exist", templateID, obsDomainID)
}
}

func (cp *CollectingProcess) deleteTemplate(obsDomainID uint32, templateID uint16) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
delete(cp.templatesMap[obsDomainID], templateID)
}

func (cp *CollectingProcess) updateAddress(address net.Addr) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
Expand Down
Loading
Loading