인프라/컨테이너

Kubernetes Scheme와 Codec

윤현우 2023. 3. 25. 16:55

Scheme

쿠버네티스에서 스킴은 쿠버네티스 GVK(Group, Version, Kind)로 구분되는 API 객체를 직렬화하고 역직렬화합니다.
예를 들어 apps Group의, v1 Version, Deployment kind를 가지고 있는 객체는 아래와 같습니다.

apiVersion: apps/v1
kind: Deployment

Kubernetes에서 새 스키마를 생성하고 반환하는 NewScheme 코드입니다.  

func NewScheme() *Scheme {
	s := &Scheme{
		gvkToType:                 map[schema.GroupVersionKind]reflect.Type{},
		typeToGVK:                 map[reflect.Type][]schema.GroupVersionKind{},
		unversionedTypes:          map[reflect.Type]schema.GroupVersionKind{},
		unversionedKinds:          map[string]reflect.Type{},
		fieldLabelConversionFuncs: map[schema.GroupVersionKind]FieldLabelConversionFunc{},
		defaulterFuncs:            map[reflect.Type]func(interface{}){},
		versionPriority:           map[string][]string{},
		schemeName:                naming.GetNameFromCallsite(internalPackages...),
	}
	s.converter = conversion.NewConverter(nil)

	// Enable couple default conversions by default.
	utilruntime.Must(RegisterEmbeddedConversions(s))
	utilruntime.Must(RegisterStringConversions(s))
	return s
}

이렇게 반환된 스키마가 제공하는 기능으로 객체는 직렬화되어 ETCD에 저장되거나, Client로 반환됩니다.

실제로 API Server에선 다음과 같은 코드로 스키마를 사용합니다.

func newLenientSchemeAndCodecs() (*runtime.Scheme, *serializer.CodecFactory, error) {
		lenientScheme := runtime.NewScheme()
		if err := kubeproxyconfig.AddToScheme(lenientScheme); err != nil {
			return nil, nil, fmt.Errorf("failed to add kube-proxy config API to lenient scheme: %v", err)
		}
		if err := kubeproxyconfigv1alpha1.AddToScheme(lenientScheme); err != nil {
			return nil, nil, fmt.Errorf("failed to add kube-proxy config v1alpha1 API to lenient scheme: %v", err)
		}
		lenientCodecs := serializer.NewCodecFactory(lenientScheme, serializer.DisableStrict)
		return lenientScheme, &lenientCodecs, nil
	}
  1. lenientScheme 변수에 Scheme를 할당합니다.
  2. 두 if문에서 kubeproxyconfigv1alpha1과 kubeproxyconfig에 있는 API 객체를 등록합니다.
  3. 그러면 API Server에서 요청을 받을 때 Group, Version, Kind를 통해 적절한 스키마를 찾습니다.
  4. 그리고 아까 추가한 스키마에서 Codec을 이용해 요청받은 데이터를 역직렬화하여 해당 객체를 생성합니다.

Codec

Codec은 데이터의 인코딩/디코딩을 담당합니다. 스키마에 붙는 코덱 인터페이스는 요청받는/하는 데이터를 객체에 맞게 인코딩/디코딩해 줍니다.

다음은 codec의 인/디코딩 함수입니다.

func (f CodecFactory) DecoderToVersion(decoder runtime.Decoder, gv runtime.GroupVersioner) runtime.Decoder {
	return f.CodecForVersions(nil, decoder, nil, gv)
}

func (f CodecFactory) EncoderForVersion(encoder runtime.Encoder, gv runtime.GroupVersioner) runtime.Encoder {
	return f.CodecForVersions(encoder, nil, gv, nil)
}

디코더와 인코더가 runtime의 GroupVersion을 받아 이에 맞는 인/디코더를 반환해 주는 모습입니다. 이런 객체 생성은 팩토리 메서드를 통해 이루어집니다. 팩토리 클래스에서는 스키마에 맞는 Codec을 리턴해줍니다 위 스키마를 설명할 때 나온 kube-proxy에서도 codecFactory를 만든 후, 코덱을 불러오는 모습을 볼 수 있었습니다.

lenientCodecs := serializer.NewCodecFactory(lenientScheme, serializer.DisableStrict) 
return lenientScheme, &lenientCodecs, nil

쿠버네티스 내부 구현에서 Encode, Decode는 API Server와 Client 간의 통신에서  JSON-Binary 간 변환을 의미합니다.

대부분 아시겠지만 조금 의아할 수 있습니다.

"나는 지금까지 YAML manifest로 오브젝트들을 생성했는데 JSON??"

하지만 kubernetes api server는 JSON을 사용해 데이터를 처리합니다. Decode의 코드를 살펴보겠습니다.

func (d *decoder) Decode(data []byte, gvk *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
	var (
		lastErr error
		skipped []runtime.Decoder
	)

	// try recognizers, record any decoders we need to give a chance later
	for _, r := range d.decoders {
		switch t := r.(type) {
		case RecognizingDecoder:
			ok, unknown, err := t.RecognizesData(data)
			if err != nil {
				lastErr = err
				continue
			}
			if unknown {
				skipped = append(skipped, t)
				continue
			}
			if !ok {
				continue
			}
			return r.Decode(data, gvk, into)
		default:
			skipped = append(skipped, t)
		}
	}

	// try recognizers that returned unknown or didn't recognize their data
	for _, r := range skipped {
		out, actual, err := r.Decode(data, gvk, into)
		if err != nil {
			// if we got an object back from the decoder, and the
			// error was a strict decoding error (e.g. unknown or
			// duplicate fields), we still consider the recognizer
			// to have understood the object
			if out == nil || !runtime.IsStrictDecodingError(err) {
				lastErr = err
				continue
			}
		}
		return out, actual, err
	}

	if lastErr == nil {
		lastErr = fmt.Errorf("no serialization format matched the provided data")
	}
	return nil, nil, lastErr
}

YAML, JSON 등 데이터 포맷을 디코딩할 수 있는 객체를 runtime.Decoder에서 불러옵니다. 그리고 디코더 객체에 데이터를 하나씩 넣어보며 디코딩이 가능한 객체를 찾습니다. 

실제로 디코더의 한 부분의 코드를 살펴보면 아래와 같습니다.

func (c yamlSerializer) Decode(data []byte, gvk *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
	out, err := yaml.ToJSON(data)
	if err != nil {
		return nil, nil, err
	}
	data = out
	return c.Serializer.Decode(data, gvk, into)
}

Decode함수에서 호출하는 yaml package의 Decode함수입니다. yaml.ToJSON으로  데이터를 JSON으로 디코딩 후, 다시 Serializer의 Decode 함수를 부릅니다.

실제로 Serializer의 Decode 함수와 간략한 설명은 아래와 같습니다.

func (s *Serializer) Decode(originalData []byte, gvk *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
	data := originalData
	// yaml 옵션이 true라면 YAML-> JSON으로 변환합니다.
	if s.options.Yaml {
		altered, err := yaml.YAMLToJSON(data)
		if err != nil {
			return nil, nil, err
		}
		data = altered
	}


	// gvk를 받아옵니다.
	actual, err := s.meta.Interpret(data)
	if err != nil {
		return nil, nil, err
	}

	// gvk가 있다면 actual의 gvk를 정정합니다.
	if gvk != nil {
		*actual = gvkWithDefaults(*actual, *gvk)
	}

	// 데이터를 처리할 디코더를 찾지 못한 경우, ContentType을 JSON으로 설정하고 default GVK를 할당합니다.(gvkwithDefaults)
	if unk, ok := into.(*runtime.Unknown); ok && unk != nil {
		unk.Raw = originalData
		unk.ContentType = runtime.ContentTypeJSON
		unk.GetObjectKind().SetGroupVersionKind(*actual)
		return unk, actual, nil
	}

	// into (runtime.Object) 객체가 존재한다면
	if into != nil {
		// runtime.Unstructured를 isUnstructured에 대입
		_, isUnstructured := into.(runtime.Unstructured)
		// types에 Object Kinds를 할당
		types, _, err := s.typer.ObjectKinds(into)
		switch {
		// Not Regustered Error가 발생할 시, 어떤 리소스가 등록되지 않았는지 리턴
		case runtime.IsNotRegisteredError(err), isUnstructured:
			strictErrs, err := s.unmarshal(into, data, originalData)
			if err != nil {
				return nil, actual, err
			}

			// when decoding directly into a provided unstructured object,
			// extract the actual gvk decoded from the provided data,
			// and ensure it is non-empty.
			
			if isUnstructured {
				*actual = into.GetObjectKind().GroupVersionKind()
				if len(actual.Kind) == 0 {
					return nil, actual, runtime.NewMissingKindErr(string(originalData))
				}
				// TODO(109023): require apiVersion here as well once unstructuredJSONScheme#Decode does
			}
			
			if len(strictErrs) > 0 {
				return into, actual, runtime.NewStrictDecodingError(strictErrs)
			}
			return into, actual, nil
		case err != nil:
			// 에러가 있으면 에러 반환
			return nil, actual, err
		default:
			*actual = gvkWithDefaults(*actual, types[0])
		}
	}

	if len(actual.Kind) == 0 {
		return nil, actual, runtime.NewMissingKindErr(string(originalData))
	}
	if len(actual.Version) == 0 {
		return nil, actual, runtime.NewMissingVersionErr(string(originalData))
	}

	// use the target if necessary
	obj, err := runtime.UseOrCreateObject(s.typer, s.creater, *actual, into)
	if err != nil {
		return nil, actual, err
	}
	//데이터를 언마샬링합니다. 끝으로 go object가 리턴됩니다.
	strictErrs, err := s.unmarshal(obj, data, originalData)
	if err != nil {
		return nil, actual, err
	} else if len(strictErrs) > 0 {
		return obj, actual, runtime.NewStrictDecodingError(strictErrs)
	}
	return obj, actual, nil
}

 

코드를 보면 알 수 있듯이 코덱 팩토리에서 GV를 통해 위 코드들과 같은 디코더를 가져와 JSON과 같은 형식으로 디코딩합니다.

이번 글에서는 Sheme과 Codec, 그에 관한 관단한 설명과 디코딩 과정에 대해 얇게 살펴보았습니다. 혹시 틀린 부분이 있다면 언제든 댓글로 남겨주세요.