Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions k8s/deployment/templates/deployment.yaml.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ spec:
{{- template "probe.base" dict "healthCheck" .scope.capabilities.health_check }}
failureThreshold: 90
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
terminationMessagePolicy: FallbackToLogsOnError
imagePullPolicy: Always

{{ if .scope.capabilities.additional_ports }}
Expand Down Expand Up @@ -227,7 +227,7 @@ spec:
successThreshold: 1
failureThreshold: 90
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
terminationMessagePolicy: FallbackToLogsOnError
imagePullPolicy: Always
{{ end }}
{{ end }}
Expand Down Expand Up @@ -287,7 +287,7 @@ spec:
- /bin/sleep
- '16'
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
terminationMessagePolicy: FallbackToLogsOnError
imagePullPolicy: IfNotPresent
volumeMounts:
{{- if .parameters.results }}
Expand Down
Binary file modified k8s/log/kube-logger-go/bin/linux/exec-amd64
Binary file not shown.
Binary file modified k8s/log/kube-logger-go/bin/linux/exec-arm64
Binary file not shown.
Binary file modified k8s/log/kube-logger-go/bin/linux/exec-x86_64
Binary file not shown.
114 changes: 73 additions & 41 deletions k8s/log/kube-logger-go/internal/logs/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,22 +56,31 @@ func (f *Fetcher) FetchConcurrently(pods []corev1.Pod, config types.Config) []ty

// Determine since time for this pod
sinceTime := determineSinceTime(podUID, lastReadTimes, config.StartTime)

// Get pod logs
logCh := make(chan string, 100)
go func() {
defer close(logCh)
f.streamPodLogs(&p, config.Namespace, sinceTime, int64(podLimit*3072), logCh)
}()

processor := NewProcessor()
processedLogs := processor.ProcessLinesFromChannel(logCh, config.FilterPattern, p.Name, podUID, getLastReadTime(podUID, lastReadTimes))

if len(processedLogs) > 0 {
mu.Lock()
allLogs = append(allLogs, processedLogs...)
mu.Unlock()
}
lastReadTime := getLastReadTime(podUID, lastReadTimes)
limitBytes := int64(podLimit * 3072)

// Previous-container logs (the crashed instance) are only pulled
// on the first page for this pod. Once a pagination cursor exists
// for the pod, prior-instance lines either fall before the cursor
// or were already returned, so re-fetching them every page would
// waste bandwidth on lines the processor would discard anyway.
logCh := make(chan string, 200)
go func() {
defer close(logCh)
if lastReadTime == "" && hasPreviousInstance(&p) {
f.streamPodLogs(&p, config.Namespace, sinceTime, limitBytes, true, logCh)
}
f.streamPodLogs(&p, config.Namespace, sinceTime, limitBytes, false, logCh)
}()

processor := NewProcessor()
processedLogs := processor.ProcessLinesFromChannel(logCh, config.FilterPattern, p.Name, podUID, lastReadTime)

if len(processedLogs) > 0 {
mu.Lock()
allLogs = append(allLogs, processedLogs...)
mu.Unlock()
}
}(pod)
}

Expand Down Expand Up @@ -114,30 +123,53 @@ func (f *Fetcher) getPodLogs(pod *corev1.Pod, namespace, sinceTime string, limit
return logContent.String()
}

func (f *Fetcher) streamPodLogs(pod *corev1.Pod, namespace, sinceTime string, limitBytes int64, logCh chan<- string) {
ctx := context.Background()
opts := &corev1.PodLogOptions{
Container: types.DefaultContainerName,
Timestamps: true,
LimitBytes: &limitBytes,
}
if sinceTime != "" {
if sinceTimeObj, err := time.Parse(time.RFC3339, sinceTime); err == nil {
metaTime := metav1.NewTime(sinceTimeObj)
opts.SinceTime = &metaTime
}
}
req := f.clientset.CoreV1().Pods(namespace).GetLogs(pod.Name, opts)
podLogs, err := req.Stream(ctx)
if err != nil {
return
}
defer podLogs.Close()

scanner := bufio.NewScanner(podLogs)
for scanner.Scan() {
logCh <- scanner.Text()
}
func (f *Fetcher) streamPodLogs(pod *corev1.Pod, namespace, sinceTime string, limitBytes int64, previous bool, logCh chan<- string) {
ctx := context.Background()
opts := &corev1.PodLogOptions{
Container: types.DefaultContainerName,
Timestamps: true,
LimitBytes: &limitBytes,
Previous: previous,
}
if sinceTime != "" {
if sinceTimeObj, err := time.Parse(time.RFC3339, sinceTime); err == nil {
metaTime := metav1.NewTime(sinceTimeObj)
opts.SinceTime = &metaTime
}
}
req := f.clientset.CoreV1().Pods(namespace).GetLogs(pod.Name, opts)
podLogs, err := req.Stream(ctx)
if err != nil {
// Expected when previous=true and no prior container instance exists
// (e.g. fresh pod that has never restarted). Swallow silently — the
// live-log fetch on the next call still runs.
return
}
defer podLogs.Close()

scanner := bufio.NewScanner(podLogs)
for scanner.Scan() {
logCh <- scanner.Text()
}
}

// hasPreviousInstance reports whether the pod's application container has a
// recoverable previous instance (i.e. it has restarted at least once or has a
// terminated last state). The kubelet retains exactly one prior instance per
// container, so this is the gate for whether `Previous: true` will yield data.
func hasPreviousInstance(pod *corev1.Pod) bool {
for _, cs := range pod.Status.ContainerStatuses {
if cs.Name != types.DefaultContainerName {
continue
}
if cs.RestartCount > 0 {
return true
}
if cs.LastTerminationState.Terminated != nil {
return true
}
}
return false
}

// determineSinceTime determines the appropriate since time for a pod
Expand All @@ -154,4 +186,4 @@ func getLastReadTime(podUID string, lastReadTimes map[string]string) string {
return lastTime
}
return ""
}
}
Loading