Scheme
쿠버네티스에서 스킴은 쿠버네티스 GVK(Group, Version, Kind)로 구분되는 API 객체를 직렬화하고 역직렬화합니다.
예를 들어 apps Group의, v1 Version, Deployment kind를 가지고 있는 객체는 아래와 같습니다.
apiVersion: apps/v1
kind: Deployment
Kubernetes에서 새 스키마를 생성하고 반환하는 NewScheme 코드입니다.
func NewScheme() *Scheme {
s := &Scheme{
gvkToType: map[schema.GroupVersionKind]reflect.Type{},
typeToGVK: map[reflect.Type][]schema.GroupVersionKind{},
unversionedTypes: map[reflect.Type]schema.GroupVersionKind{},
unversionedKinds: map[string]reflect.Type{},
fieldLabelConversionFuncs: map[schema.GroupVersionKind]FieldLabelConversionFunc{},
defaulterFuncs: map[reflect.Type]func(interface{}){},
versionPriority: map[string][]string{},
schemeName: naming.GetNameFromCallsite(internalPackages...),
}
s.converter = conversion.NewConverter(nil)
// Enable couple default conversions by default.
utilruntime.Must(RegisterEmbeddedConversions(s))
utilruntime.Must(RegisterStringConversions(s))
return s
}
이렇게 반환된 스키마가 제공하는 기능으로 객체는 직렬화되어 ETCD에 저장되거나, Client로 반환됩니다.
실제로 API Server에선 다음과 같은 코드로 스키마를 사용합니다.
func newLenientSchemeAndCodecs() (*runtime.Scheme, *serializer.CodecFactory, error) {
lenientScheme := runtime.NewScheme()
if err := kubeproxyconfig.AddToScheme(lenientScheme); err != nil {
return nil, nil, fmt.Errorf("failed to add kube-proxy config API to lenient scheme: %v", err)
}
if err := kubeproxyconfigv1alpha1.AddToScheme(lenientScheme); err != nil {
return nil, nil, fmt.Errorf("failed to add kube-proxy config v1alpha1 API to lenient scheme: %v", err)
}
lenientCodecs := serializer.NewCodecFactory(lenientScheme, serializer.DisableStrict)
return lenientScheme, &lenientCodecs, nil
}
- lenientScheme 변수에 Scheme를 할당합니다.
- 두 if문에서 kubeproxyconfigv1alpha1과 kubeproxyconfig에 있는 API 객체를 등록합니다.
- 그러면 API Server에서 요청을 받을 때 Group, Version, Kind를 통해 적절한 스키마를 찾습니다.
- 그리고 아까 추가한 스키마에서 Codec을 이용해 요청받은 데이터를 역직렬화하여 해당 객체를 생성합니다.
Codec
Codec은 데이터의 인코딩/디코딩을 담당합니다. 스키마에 붙는 코덱 인터페이스는 요청받는/하는 데이터를 객체에 맞게 인코딩/디코딩해 줍니다.
다음은 codec의 인/디코딩 함수입니다.
func (f CodecFactory) DecoderToVersion(decoder runtime.Decoder, gv runtime.GroupVersioner) runtime.Decoder {
return f.CodecForVersions(nil, decoder, nil, gv)
}
func (f CodecFactory) EncoderForVersion(encoder runtime.Encoder, gv runtime.GroupVersioner) runtime.Encoder {
return f.CodecForVersions(encoder, nil, gv, nil)
}
디코더와 인코더가 runtime의 GroupVersion을 받아 이에 맞는 인/디코더를 반환해 주는 모습입니다. 이런 객체 생성은 팩토리 메서드를 통해 이루어집니다. 팩토리 클래스에서는 스키마에 맞는 Codec을 리턴해줍니다 위 스키마를 설명할 때 나온 kube-proxy에서도 codecFactory를 만든 후, 코덱을 불러오는 모습을 볼 수 있었습니다.
lenientCodecs := serializer.NewCodecFactory(lenientScheme, serializer.DisableStrict)
return lenientScheme, &lenientCodecs, nil
쿠버네티스 내부 구현에서 Encode, Decode는 API Server와 Client 간의 통신에서 JSON-Binary 간 변환을 의미합니다.
대부분 아시겠지만 조금 의아할 수 있습니다.
"나는 지금까지 YAML manifest로 오브젝트들을 생성했는데 JSON??"
하지만 kubernetes api server는 JSON을 사용해 데이터를 처리합니다. Decode의 코드를 살펴보겠습니다.
func (d *decoder) Decode(data []byte, gvk *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
var (
lastErr error
skipped []runtime.Decoder
)
// try recognizers, record any decoders we need to give a chance later
for _, r := range d.decoders {
switch t := r.(type) {
case RecognizingDecoder:
ok, unknown, err := t.RecognizesData(data)
if err != nil {
lastErr = err
continue
}
if unknown {
skipped = append(skipped, t)
continue
}
if !ok {
continue
}
return r.Decode(data, gvk, into)
default:
skipped = append(skipped, t)
}
}
// try recognizers that returned unknown or didn't recognize their data
for _, r := range skipped {
out, actual, err := r.Decode(data, gvk, into)
if err != nil {
// if we got an object back from the decoder, and the
// error was a strict decoding error (e.g. unknown or
// duplicate fields), we still consider the recognizer
// to have understood the object
if out == nil || !runtime.IsStrictDecodingError(err) {
lastErr = err
continue
}
}
return out, actual, err
}
if lastErr == nil {
lastErr = fmt.Errorf("no serialization format matched the provided data")
}
return nil, nil, lastErr
}
YAML, JSON 등 데이터 포맷을 디코딩할 수 있는 객체를 runtime.Decoder에서 불러옵니다. 그리고 디코더 객체에 데이터를 하나씩 넣어보며 디코딩이 가능한 객체를 찾습니다.
실제로 디코더의 한 부분의 코드를 살펴보면 아래와 같습니다.
func (c yamlSerializer) Decode(data []byte, gvk *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
out, err := yaml.ToJSON(data)
if err != nil {
return nil, nil, err
}
data = out
return c.Serializer.Decode(data, gvk, into)
}
Decode함수에서 호출하는 yaml package의 Decode함수입니다. yaml.ToJSON으로 데이터를 JSON으로 디코딩 후, 다시 Serializer의 Decode 함수를 부릅니다.
실제로 Serializer의 Decode 함수와 간략한 설명은 아래와 같습니다.
func (s *Serializer) Decode(originalData []byte, gvk *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
data := originalData
// yaml 옵션이 true라면 YAML-> JSON으로 변환합니다.
if s.options.Yaml {
altered, err := yaml.YAMLToJSON(data)
if err != nil {
return nil, nil, err
}
data = altered
}
// gvk를 받아옵니다.
actual, err := s.meta.Interpret(data)
if err != nil {
return nil, nil, err
}
// gvk가 있다면 actual의 gvk를 정정합니다.
if gvk != nil {
*actual = gvkWithDefaults(*actual, *gvk)
}
// 데이터를 처리할 디코더를 찾지 못한 경우, ContentType을 JSON으로 설정하고 default GVK를 할당합니다.(gvkwithDefaults)
if unk, ok := into.(*runtime.Unknown); ok && unk != nil {
unk.Raw = originalData
unk.ContentType = runtime.ContentTypeJSON
unk.GetObjectKind().SetGroupVersionKind(*actual)
return unk, actual, nil
}
// into (runtime.Object) 객체가 존재한다면
if into != nil {
// runtime.Unstructured를 isUnstructured에 대입
_, isUnstructured := into.(runtime.Unstructured)
// types에 Object Kinds를 할당
types, _, err := s.typer.ObjectKinds(into)
switch {
// Not Regustered Error가 발생할 시, 어떤 리소스가 등록되지 않았는지 리턴
case runtime.IsNotRegisteredError(err), isUnstructured:
strictErrs, err := s.unmarshal(into, data, originalData)
if err != nil {
return nil, actual, err
}
// when decoding directly into a provided unstructured object,
// extract the actual gvk decoded from the provided data,
// and ensure it is non-empty.
if isUnstructured {
*actual = into.GetObjectKind().GroupVersionKind()
if len(actual.Kind) == 0 {
return nil, actual, runtime.NewMissingKindErr(string(originalData))
}
// TODO(109023): require apiVersion here as well once unstructuredJSONScheme#Decode does
}
if len(strictErrs) > 0 {
return into, actual, runtime.NewStrictDecodingError(strictErrs)
}
return into, actual, nil
case err != nil:
// 에러가 있으면 에러 반환
return nil, actual, err
default:
*actual = gvkWithDefaults(*actual, types[0])
}
}
if len(actual.Kind) == 0 {
return nil, actual, runtime.NewMissingKindErr(string(originalData))
}
if len(actual.Version) == 0 {
return nil, actual, runtime.NewMissingVersionErr(string(originalData))
}
// use the target if necessary
obj, err := runtime.UseOrCreateObject(s.typer, s.creater, *actual, into)
if err != nil {
return nil, actual, err
}
//데이터를 언마샬링합니다. 끝으로 go object가 리턴됩니다.
strictErrs, err := s.unmarshal(obj, data, originalData)
if err != nil {
return nil, actual, err
} else if len(strictErrs) > 0 {
return obj, actual, runtime.NewStrictDecodingError(strictErrs)
}
return obj, actual, nil
}
코드를 보면 알 수 있듯이 코덱 팩토리에서 GV를 통해 위 코드들과 같은 디코더를 가져와 JSON과 같은 형식으로 디코딩합니다.
이번 글에서는 Sheme과 Codec, 그에 관한 관단한 설명과 디코딩 과정에 대해 얇게 살펴보았습니다. 혹시 틀린 부분이 있다면 언제든 댓글로 남겨주세요.