-
Notifications
You must be signed in to change notification settings - Fork 27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix UDP template expiry in collector process #381
base: main
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,6 +25,7 @@ | |
"time" | ||
|
||
"k8s.io/klog/v2" | ||
"k8s.io/utils/clock" | ||
|
||
"github.com/vmware/go-ipfix/pkg/entities" | ||
"github.com/vmware/go-ipfix/pkg/registry" | ||
|
@@ -50,9 +51,15 @@ | |
DecodingModeLenientDropUnknown DecodingMode = "LenientDropUnknown" | ||
) | ||
|
||
type template struct { | ||
ies []*entities.InfoElement | ||
expiryTime time.Time | ||
expiryTimer clock.Timer | ||
} | ||
|
||
type CollectingProcess struct { | ||
// for each obsDomainID, there is a map of templates | ||
templatesMap map[uint32]map[uint16][]*entities.InfoElement | ||
templatesMap map[uint32]map[uint16]*template | ||
// mutex allows multiple readers or one writer at the same time | ||
mutex sync.RWMutex | ||
// template lifetime | ||
|
@@ -85,6 +92,8 @@ | |
serverKey []byte | ||
wg sync.WaitGroup | ||
numOfRecordsReceived uint64 | ||
// clock implementation: enables injecting a fake clock for testing | ||
clock clock.WithDelayedExecution | ||
} | ||
|
||
type CollectorInput struct { | ||
|
@@ -113,7 +122,7 @@ | |
closeClientChan chan struct{} | ||
} | ||
|
||
func InitCollectingProcess(input CollectorInput) (*CollectingProcess, error) { | ||
func initCollectingProcess(input CollectorInput, clock clock.WithDelayedExecution) (*CollectingProcess, error) { | ||
templateTTLSeconds := input.TemplateTTL | ||
if input.Protocol == "udp" && templateTTLSeconds == 0 { | ||
templateTTLSeconds = entities.TemplateTTL | ||
|
@@ -128,8 +137,8 @@ | |
"encrypted", input.IsEncrypted, "address", input.Address, "protocol", input.Protocol, "maxBufferSize", input.MaxBufferSize, | ||
"templateTTL", templateTTL, "numExtraElements", input.NumExtraElements, "decodingMode", decodingMode, | ||
) | ||
collectProc := &CollectingProcess{ | ||
templatesMap: make(map[uint32]map[uint16][]*entities.InfoElement), | ||
cp := &CollectingProcess{ | ||
templatesMap: make(map[uint32]map[uint16]*template), | ||
mutex: sync.RWMutex{}, | ||
templateTTL: templateTTL, | ||
address: input.Address, | ||
|
@@ -144,8 +153,13 @@ | |
serverKey: input.ServerKey, | ||
numExtraElements: input.NumExtraElements, | ||
decodingMode: decodingMode, | ||
clock: clock, | ||
} | ||
return collectProc, nil | ||
return cp, nil | ||
} | ||
|
||
func InitCollectingProcess(input CollectorInput) (*CollectingProcess, error) { | ||
return initCollectingProcess(input, clock.RealClock{}) | ||
} | ||
|
||
func (cp *CollectingProcess) Start() { | ||
|
@@ -321,7 +335,7 @@ | |
|
||
func (cp *CollectingProcess) decodeDataSet(dataBuffer *bytes.Buffer, obsDomainID uint32, templateID uint16) (entities.Set, error) { | ||
// make sure template exists | ||
template, err := cp.getTemplate(obsDomainID, templateID) | ||
template, err := cp.getTemplateIEs(obsDomainID, templateID) | ||
if err != nil { | ||
return nil, fmt.Errorf("template %d with obsDomainID %d does not exist", templateID, obsDomainID) | ||
} | ||
|
@@ -361,47 +375,94 @@ | |
func (cp *CollectingProcess) addTemplate(obsDomainID uint32, templateID uint16, elementsWithValue []entities.InfoElementWithValue) { | ||
cp.mutex.Lock() | ||
defer cp.mutex.Unlock() | ||
if _, exists := cp.templatesMap[obsDomainID]; !exists { | ||
cp.templatesMap[obsDomainID] = make(map[uint16][]*entities.InfoElement) | ||
if _, ok := cp.templatesMap[obsDomainID]; !ok { | ||
cp.templatesMap[obsDomainID] = make(map[uint16]*template) | ||
} | ||
elements := make([]*entities.InfoElement, 0) | ||
for _, elementWithValue := range elementsWithValue { | ||
elements = append(elements, elementWithValue.GetInfoElement()) | ||
} | ||
cp.templatesMap[obsDomainID][templateID] = elements | ||
// template lifetime management | ||
tpl, ok := cp.templatesMap[obsDomainID][templateID] | ||
if !ok { | ||
tpl = &template{} | ||
cp.templatesMap[obsDomainID][templateID] = tpl | ||
} | ||
tpl.ies = elements | ||
klog.V(4).InfoS("Added template to template map", "obsDomainID", obsDomainID, "templateID", templateID) | ||
// Template lifetime management for UDP. | ||
if cp.protocol != "udp" { | ||
return | ||
} | ||
// Handle udp template expiration | ||
go func() { | ||
ticker := time.NewTicker(cp.templateTTL) | ||
defer ticker.Stop() | ||
select { | ||
case <-ticker.C: | ||
tpl.expiryTime = cp.clock.Now().Add(cp.templateTTL) | ||
if tpl.expiryTimer == nil { | ||
tpl.expiryTimer = cp.clock.AfterFunc(cp.templateTTL, func() { | ||
klog.Infof("Template with id %d, and obsDomainID %d is expired.", templateID, obsDomainID) | ||
cp.deleteTemplate(obsDomainID, templateID) | ||
break | ||
now := cp.clock.Now() | ||
// From the Go documentation: | ||
// For a func-based timer created with AfterFunc(d, f), Reset either | ||
// reschedules when f will run, in which case Reset returns true, or | ||
// schedules f to run again, in which case it returns false. When Reset | ||
// returns false, Reset neither waits for the prior f to complete before | ||
// returning nor does it guarantee that the subsequent goroutine running f | ||
// does not run concurrently with the prior one. If the caller needs to | ||
// know whether the prior execution of f is completed, it must coordinate | ||
// with f explicitly. | ||
// In our case, when f executes, we have to verify that the record is indeed | ||
// scheduled for deletion by checking expiryTime. We cannot just | ||
// automatically delete the template. | ||
|
||
// No reason to try to stop the timer in this case, even though it would be | ||
// technically correct, so we pass false for stopTimer. | ||
cp.deleteTemplateWithConds(obsDomainID, templateID, false, func(tpl *template) bool { | ||
// lock will be held when this executes | ||
return !tpl.expiryTime.After(now) | ||
}) | ||
}) | ||
} else { | ||
tpl.expiryTimer.Reset(cp.templateTTL) | ||
} | ||
} | ||
|
||
// deleteTemplate returns true iff a template was actually deleted. | ||
func (cp *CollectingProcess) deleteTemplate(obsDomainID uint32, templateID uint16) bool { | ||
return cp.deleteTemplateWithConds(obsDomainID, templateID, true) | ||
} | ||
|
||
// deleteTemplateWithConds returns true iff a template was actually deleted. | ||
func (cp *CollectingProcess) deleteTemplateWithConds(obsDomainID uint32, templateID uint16, stopTimer bool, condFns ...func(*template) bool) bool { | ||
cp.mutex.Lock() | ||
defer cp.mutex.Unlock() | ||
template, ok := cp.templatesMap[obsDomainID][templateID] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wouldn't the templateID be deleted before, and the timer cannot be stopped when we can't find the templateID in templatesMap? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. which case / code path are you referring to? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I mean, when we remove the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you also remind me where we call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
We do this a few lines below: if stopTimer && template.expiryTimer != nil {
template.expiryTimer.Stop()
}
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the explanation. So stopTimer is mainly for TCP when we want to delete the template before it is expired. |
||
if !ok { | ||
return false | ||
} | ||
for _, condFn := range condFns { | ||
if !condFn(template) { | ||
return false | ||
} | ||
}() | ||
} | ||
if stopTimer && template.expiryTimer != nil { | ||
template.expiryTimer.Stop() | ||
} | ||
delete(cp.templatesMap[obsDomainID], templateID) | ||
klog.V(4).InfoS("Deleted template from template map", "obsDomainID", obsDomainID, "templateID", templateID) | ||
if len(cp.templatesMap[obsDomainID]) == 0 { | ||
delete(cp.templatesMap, obsDomainID) | ||
klog.V(4).InfoS("No more templates for observation domain", "obsDomainID", obsDomainID) | ||
} | ||
return true | ||
} | ||
|
||
func (cp *CollectingProcess) getTemplate(obsDomainID uint32, templateID uint16) ([]*entities.InfoElement, error) { | ||
func (cp *CollectingProcess) getTemplateIEs(obsDomainID uint32, templateID uint16) ([]*entities.InfoElement, error) { | ||
cp.mutex.RLock() | ||
defer cp.mutex.RUnlock() | ||
if elements, exists := cp.templatesMap[obsDomainID][templateID]; exists { | ||
return elements, nil | ||
if template, ok := cp.templatesMap[obsDomainID][templateID]; ok { | ||
return template.ies, nil | ||
} else { | ||
return nil, fmt.Errorf("template %d with obsDomainID %d does not exist", templateID, obsDomainID) | ||
} | ||
} | ||
|
||
func (cp *CollectingProcess) deleteTemplate(obsDomainID uint32, templateID uint16) { | ||
cp.mutex.Lock() | ||
defer cp.mutex.Unlock() | ||
delete(cp.templatesMap[obsDomainID], templateID) | ||
} | ||
|
||
func (cp *CollectingProcess) updateAddress(address net.Addr) { | ||
cp.mutex.Lock() | ||
defer cp.mutex.Unlock() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trying to follow the logic, is it because the expiryTimer is already nil, so there's no point to set stopTimer to true? Because I was thinking about L444, wondering if there's a case when stopTimer is true but expiryTimer is nil.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this code executes as part of the function invoked by the timer. Broadly speaking, if the function has been invoked by the timer, then the timer has already fired and there is no need to stop it. There are some edge cases with
AfterFunc
which are described in the documentation snippet above.Essentially there are 2 cases:
addTemplate
(and toAfterFunc
). This is the edge case described in the doc. We rely on the condition below (!tpl.expiryTime.After(now)
) to determine whether we should just ignore this firing of the timer. The synchronization provided bycp.mutex
is very important here. We will wait until the call toaddTemplate
returns and releases the lock. If the condition is false (which will be the case), we will not callexpiryTimer.Stop()
indeleteTemplateWithConds
(even ifstopTimer
was set to true), as the template entry has been updated and the current run of the timer function should be ignored and be a no-op.Calling
expiryTimer.Stop()
even if the timer has already been stopped is harmless. But it doesn't work well with the fake clock unfortunately. If the fake clock implementation is fixed, I may removestopTimer
.The
nil
check at line L444 is more for defensive programming and for the TCP case.