diff --git a/pkg/build/registry/buildconfig/webhook.go b/pkg/build/registry/buildconfig/webhook.go index 850fa35081c6..945adfc171cc 100644 --- a/pkg/build/registry/buildconfig/webhook.go +++ b/pkg/build/registry/buildconfig/webhook.go @@ -94,7 +94,7 @@ func (h *WebHookHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // ProcessWebHook does the actual work of processing the webhook request func (w *WebHookHandler) ProcessWebHook(writer http.ResponseWriter, req *http.Request, ctx apirequest.Context, name, subpath string) error { - parts := strings.Split(subpath, "/") + parts := strings.Split(strings.TrimPrefix(subpath, "/"), "/") if len(parts) != 2 { return errors.NewBadRequest(fmt.Sprintf("unexpected hook subpath %s", subpath)) } diff --git a/test/testdata/bootstrappolicy/bootstrap_namespace_role_bindings.yaml b/test/testdata/bootstrappolicy/bootstrap_namespace_role_bindings.yaml index c0c96663339c..9c5b6f64da72 100644 --- a/test/testdata/bootstrappolicy/bootstrap_namespace_role_bindings.yaml +++ b/test/testdata/bootstrappolicy/bootstrap_namespace_role_bindings.yaml @@ -121,7 +121,8 @@ items: kind: Role name: shared-resource-viewer subjects: - - kind: Group + - apiGroup: rbac.authorization.k8s.io + kind: Group name: system:authenticated kind: List metadata: {} diff --git a/test/testdata/bootstrappolicy/bootstrap_service_account_project_role_bindings.yaml b/test/testdata/bootstrappolicy/bootstrap_service_account_project_role_bindings.yaml index c187ab195c7f..de8f097650bd 100644 --- a/test/testdata/bootstrappolicy/bootstrap_service_account_project_role_bindings.yaml +++ b/test/testdata/bootstrappolicy/bootstrap_service_account_project_role_bindings.yaml @@ -11,7 +11,8 @@ items: kind: ClusterRole name: system:image-puller subjects: - - kind: Group + - apiGroup: rbac.authorization.k8s.io + kind: Group name: system:serviceaccounts:myproject - apiVersion: rbac.authorization.k8s.io/v1beta1 kind: RoleBinding diff --git a/vendor/k8s.io/kubernetes/cmd/kubelet/app/BUILD b/vendor/k8s.io/kubernetes/cmd/kubelet/app/BUILD index 89e870fb0b4e..febc698d9aed 100644 --- a/vendor/k8s.io/kubernetes/cmd/kubelet/app/BUILD +++ b/vendor/k8s.io/kubernetes/cmd/kubelet/app/BUILD @@ -43,6 +43,7 @@ go_library( "//pkg/capabilities:go_default_library", "//pkg/client/chaosclient:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", + "//pkg/client/clientset_generated/clientset/typed/core/v1:go_default_library", "//pkg/cloudprovider:go_default_library", "//pkg/cloudprovider/providers:go_default_library", "//pkg/credentialprovider:go_default_library", diff --git a/vendor/k8s.io/kubernetes/cmd/kubelet/app/server.go b/vendor/k8s.io/kubernetes/cmd/kubelet/app/server.go index 8add1565d5d3..c52d7fd0cb1a 100644 --- a/vendor/k8s.io/kubernetes/cmd/kubelet/app/server.go +++ b/vendor/k8s.io/kubernetes/cmd/kubelet/app/server.go @@ -62,6 +62,7 @@ import ( "k8s.io/kubernetes/pkg/capabilities" "k8s.io/kubernetes/pkg/client/chaosclient" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + v1coregenerated "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/core/v1" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/credentialprovider" "k8s.io/kubernetes/pkg/features" @@ -158,6 +159,7 @@ func UnsecuredKubeletDeps(s *options.KubeletServer) (*kubelet.KubeletDeps, error ContainerManager: nil, DockerClient: dockerClient, KubeClient: nil, + HeartbeatClient: nil, ExternalKubeClient: nil, Mounter: mounter, NetworkPlugins: ProbeNetworkPlugins(s.NetworkPluginDir, s.CNIConfDir, s.CNIBinDir), @@ -453,6 +455,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.KubeletDeps) (err error) { if kubeDeps.KubeClient == nil || kubeDeps.ExternalKubeClient == nil || kubeDeps.EventClient == nil { var kubeClient clientset.Interface var eventClient v1core.EventsGetter + var heartbeatClient v1coregenerated.CoreV1Interface var externalKubeClient clientgoclientset.Interface clientConfig, err := CreateAPIServerClientConfig(s) @@ -481,6 +484,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.KubeletDeps) (err error) { if err != nil { glog.Warningf("New kubeClient from clientConfig error: %v", err) } + // make a separate client for events eventClientConfig := *clientConfig eventClientConfig.QPS = float32(s.EventRecordQPS) @@ -489,6 +493,15 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.KubeletDeps) (err error) { if err != nil { glog.Warningf("Failed to create API Server client: %v", err) } + + // make a separate client for heartbeat with throttling disabled and a timeout attached + heartbeatClientConfig := *clientConfig + heartbeatClientConfig.Timeout = s.KubeletConfiguration.NodeStatusUpdateFrequency.Duration + heartbeatClientConfig.QPS = float32(-1) + heartbeatClient, err = v1coregenerated.NewForConfig(&heartbeatClientConfig) + if err != nil { + glog.Warningf("Failed to create API Server client for heartbeat: %v", err) + } } else { switch { case s.RequireKubeConfig: @@ -503,6 +516,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.KubeletDeps) (err error) { kubeDeps.KubeClient = kubeClient kubeDeps.ExternalKubeClient = externalKubeClient kubeDeps.EventClient = eventClient + kubeDeps.HeartbeatClient = heartbeatClient } if kubeDeps.Auth == nil { diff --git a/vendor/k8s.io/kubernetes/pkg/api/validation/validation.go b/vendor/k8s.io/kubernetes/pkg/api/validation/validation.go index 766463d840da..33085ce6e198 100644 --- a/vendor/k8s.io/kubernetes/pkg/api/validation/validation.go +++ b/vendor/k8s.io/kubernetes/pkg/api/validation/validation.go @@ -2707,6 +2707,16 @@ func ValidatePodUpdate(newPod, oldPod *api.Pod) field.ErrorList { // handle updateable fields by munging those fields prior to deep equal comparison. mungedPod := *newPod + + // allow hostname and subdomain to be updated if they are empty. This allows for migration between the beta + // annotations and the GA field when upgrading between Kubernetes 1.6.x and 1.7.x. + if oldPod.Spec.Hostname == "" { + mungedPod.Spec.Hostname = oldPod.Spec.Hostname + } + if oldPod.Spec.Subdomain == "" { + mungedPod.Spec.Subdomain = oldPod.Spec.Subdomain + } + // munge spec.containers[*].image var newContainers []api.Container for ix, container := range mungedPod.Spec.Containers { diff --git a/vendor/k8s.io/kubernetes/pkg/api/validation/validation_test.go b/vendor/k8s.io/kubernetes/pkg/api/validation/validation_test.go index 114973bf7e2a..a9cb2cb496c4 100644 --- a/vendor/k8s.io/kubernetes/pkg/api/validation/validation_test.go +++ b/vendor/k8s.io/kubernetes/pkg/api/validation/validation_test.go @@ -5781,6 +5781,54 @@ func TestValidatePodUpdate(t *testing.T) { "metadata.annotations[kubernetes.io/config.mirror]", "changed mirror pod annotation", }, + { + api.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "foo"}, + Spec: api.PodSpec{Hostname: "bar"}, + }, + api.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "foo"}, + Spec: api.PodSpec{Hostname: ""}, + }, + "", + "update empty hostname", + }, + { + api.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "foo"}, + Spec: api.PodSpec{Subdomain: "bar"}, + }, + api.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "foo"}, + Spec: api.PodSpec{Subdomain: ""}, + }, + "", + "update empty subdomain", + }, + { + api.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "foo"}, + Spec: api.PodSpec{Hostname: "bar"}, + }, + api.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "foo"}, + Spec: api.PodSpec{Hostname: "baz"}, + }, + "spec: Forbidden", + "update hostname", + }, + { + api.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "foo"}, + Spec: api.PodSpec{Subdomain: "bar"}, + }, + api.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "foo"}, + Spec: api.PodSpec{Subdomain: "baz"}, + }, + "spec: Forbidden", + "update subdomain", + }, } for _, test := range tests { diff --git a/vendor/k8s.io/kubernetes/pkg/apis/rbac/BUILD b/vendor/k8s.io/kubernetes/pkg/apis/rbac/BUILD index 05d8e56eb3df..703b21e52e20 100644 --- a/vendor/k8s.io/kubernetes/pkg/apis/rbac/BUILD +++ b/vendor/k8s.io/kubernetes/pkg/apis/rbac/BUILD @@ -5,6 +5,7 @@ licenses(["notice"]) load( "@io_bazel_rules_go//go:def.bzl", "go_library", + "go_test", ) go_library( @@ -44,3 +45,16 @@ filegroup( ], tags = ["automanaged"], ) + +go_test( + name = "go_default_xtest", + srcs = ["helpers_test.go"], + deps = [ + ":go_default_library", + "//pkg/api:go_default_library", + "//pkg/apis/rbac/install:go_default_library", + "//pkg/apis/rbac/v1beta1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/diff:go_default_library", + ], +) diff --git a/vendor/k8s.io/kubernetes/pkg/apis/rbac/helpers.go b/vendor/k8s.io/kubernetes/pkg/apis/rbac/helpers.go index 5a246ab4509b..5e8ec5124b62 100644 --- a/vendor/k8s.io/kubernetes/pkg/apis/rbac/helpers.go +++ b/vendor/k8s.io/kubernetes/pkg/apis/rbac/helpers.go @@ -349,7 +349,7 @@ func NewRoleBindingForClusterRole(roleName, namespace string) *RoleBindingBuilde // Groups adds the specified groups as the subjects of the RoleBinding. func (r *RoleBindingBuilder) Groups(groups ...string) *RoleBindingBuilder { for _, group := range groups { - r.RoleBinding.Subjects = append(r.RoleBinding.Subjects, Subject{Kind: GroupKind, Name: group}) + r.RoleBinding.Subjects = append(r.RoleBinding.Subjects, Subject{Kind: GroupKind, APIGroup: GroupName, Name: group}) } return r } @@ -357,7 +357,7 @@ func (r *RoleBindingBuilder) Groups(groups ...string) *RoleBindingBuilder { // Users adds the specified users as the subjects of the RoleBinding. func (r *RoleBindingBuilder) Users(users ...string) *RoleBindingBuilder { for _, user := range users { - r.RoleBinding.Subjects = append(r.RoleBinding.Subjects, Subject{Kind: UserKind, Name: user}) + r.RoleBinding.Subjects = append(r.RoleBinding.Subjects, Subject{Kind: UserKind, APIGroup: GroupName, Name: user}) } return r } diff --git a/vendor/k8s.io/kubernetes/pkg/apis/rbac/helpers_test.go b/vendor/k8s.io/kubernetes/pkg/apis/rbac/helpers_test.go new file mode 100644 index 000000000000..e8d134abd349 --- /dev/null +++ b/vendor/k8s.io/kubernetes/pkg/apis/rbac/helpers_test.go @@ -0,0 +1,72 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rbac_test + +import ( + "reflect" + "testing" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/diff" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/apis/rbac" + "k8s.io/kubernetes/pkg/apis/rbac/v1beta1" + + // install RBAC types + _ "k8s.io/kubernetes/pkg/apis/rbac/install" +) + +// TestHelpersRoundTrip confirms that the rbac.New* helper functions produce RBAC objects that match objects +// that have gone through conversion and defaulting. This is required because these helper functions are +// used to create the bootstrap RBAC policy which is used during reconciliation. If they produced objects +// that did not match, reconciliation would incorrectly add duplicate data to the cluster's RBAC policy. +func TestHelpersRoundTrip(t *testing.T) { + rb := rbac.NewRoleBinding("role", "ns").Groups("g").SAs("ns", "sa").Users("u").BindingOrDie() + rbcr := rbac.NewRoleBindingForClusterRole("role", "ns").Groups("g").SAs("ns", "sa").Users("u").BindingOrDie() + crb := rbac.NewClusterBinding("role").Groups("g").SAs("ns", "sa").Users("u").BindingOrDie() + + role := &rbac.Role{ + Rules: []rbac.PolicyRule{ + rbac.NewRule("verb").Groups("g").Resources("foo").RuleOrDie(), + rbac.NewRule("verb").URLs("/foo").RuleOrDie(), + }, + } + clusterRole := &rbac.ClusterRole{ + Rules: []rbac.PolicyRule{ + rbac.NewRule("verb").Groups("g").Resources("foo").RuleOrDie(), + rbac.NewRule("verb").URLs("/foo").RuleOrDie(), + }, + } + + for _, internalObj := range []runtime.Object{&rb, &rbcr, &crb, role, clusterRole} { + v1Obj, err := api.Scheme.ConvertToVersion(internalObj, v1beta1.SchemeGroupVersion) + if err != nil { + t.Errorf("err on %T: %v", internalObj, err) + continue + } + api.Scheme.Default(v1Obj) + roundTrippedObj, err := api.Scheme.ConvertToVersion(v1Obj, rbac.SchemeGroupVersion) + if err != nil { + t.Errorf("err on %T: %v", internalObj, err) + continue + } + if !reflect.DeepEqual(internalObj, roundTrippedObj) { + t.Errorf("err on %T: got difference:\n%s", internalObj, diff.ObjectDiff(internalObj, roundTrippedObj)) + continue + } + } +} diff --git a/vendor/k8s.io/kubernetes/pkg/cloudprovider/providers/gce/gce_loadbalancer_external.go b/vendor/k8s.io/kubernetes/pkg/cloudprovider/providers/gce/gce_loadbalancer_external.go index 5a6ddd561545..0c4d547c3e33 100644 --- a/vendor/k8s.io/kubernetes/pkg/cloudprovider/providers/gce/gce_loadbalancer_external.go +++ b/vendor/k8s.io/kubernetes/pkg/cloudprovider/providers/gce/gce_loadbalancer_external.go @@ -487,8 +487,9 @@ func (gce *GCECloud) createTargetPool(name, serviceName, ipAddress, region strin } } var err error + hcRequestPath, hcPort := hc.RequestPath, hc.Port if hc, err = gce.ensureHttpHealthCheck(hc.Name, hc.RequestPath, int32(hc.Port)); err != nil || hc == nil { - return fmt.Errorf("Failed to ensure health check for %v port %d path %v: %v", name, hc.Port, hc.RequestPath, err) + return fmt.Errorf("Failed to ensure health check for %v port %d path %v: %v", name, hcPort, hcRequestPath, err) } hcLinks = append(hcLinks, hc.SelfLink) } diff --git a/vendor/k8s.io/kubernetes/pkg/cloudprovider/providers/gce/gce_loadbalancer_internal.go b/vendor/k8s.io/kubernetes/pkg/cloudprovider/providers/gce/gce_loadbalancer_internal.go index cd981080f757..3a11d8e6f56a 100644 --- a/vendor/k8s.io/kubernetes/pkg/cloudprovider/providers/gce/gce_loadbalancer_internal.go +++ b/vendor/k8s.io/kubernetes/pkg/cloudprovider/providers/gce/gce_loadbalancer_internal.go @@ -108,7 +108,7 @@ func (gce *GCECloud) ensureInternalLoadBalancer(clusterName, clusterID string, s fwdRuleDeleted := false if existingFwdRule != nil && !fwdRuleEqual(existingFwdRule, expectedFwdRule) { glog.V(2).Infof("ensureInternalLoadBalancer(%v): deleting existing forwarding rule with IP address %v", loadBalancerName, existingFwdRule.IPAddress) - if err = gce.DeleteRegionForwardingRule(loadBalancerName, gce.region); err != nil && !isNotFound(err) { + if err = ignoreNotFound(gce.DeleteRegionForwardingRule(loadBalancerName, gce.region)); err != nil { return nil, err } fwdRuleDeleted = true @@ -199,7 +199,7 @@ func (gce *GCECloud) ensureInternalLoadBalancerDeleted(clusterName, clusterID st defer gce.sharedResourceLock.Unlock() glog.V(2).Infof("ensureInternalLoadBalancerDeleted(%v): deleting region internal forwarding rule", loadBalancerName) - if err := gce.DeleteRegionForwardingRule(loadBalancerName, gce.region); err != nil && !isNotFound(err) { + if err := ignoreNotFound(gce.DeleteRegionForwardingRule(loadBalancerName, gce.region)); err != nil { return err } @@ -210,7 +210,7 @@ func (gce *GCECloud) ensureInternalLoadBalancerDeleted(clusterName, clusterID st } glog.V(2).Infof("ensureInternalLoadBalancerDeleted(%v): deleting firewall for traffic", loadBalancerName) - if err := gce.DeleteFirewall(loadBalancerName); err != nil { + if err := ignoreNotFound(gce.DeleteFirewall(loadBalancerName)); err != nil { return err } @@ -260,7 +260,7 @@ func (gce *GCECloud) teardownInternalHealthCheckAndFirewall(hcName string) error glog.V(2).Infof("teardownInternalHealthCheckAndFirewall(%v): health check deleted", hcName) hcFirewallName := makeHealthCheckFirewallNameFromHC(hcName) - if err := gce.DeleteFirewall(hcFirewallName); err != nil && !isNotFound(err) { + if err := ignoreNotFound(gce.DeleteFirewall(hcFirewallName)); err != nil { return fmt.Errorf("failed to delete health check firewall: %v, err: %v", hcFirewallName, err) } glog.V(2).Infof("teardownInternalHealthCheckAndFirewall(%v): health check firewall deleted", hcFirewallName) diff --git a/vendor/k8s.io/kubernetes/pkg/controller/cloud/nodecontroller.go b/vendor/k8s.io/kubernetes/pkg/controller/cloud/nodecontroller.go index d3841930a5ad..9e7660a39f57 100644 --- a/vendor/k8s.io/kubernetes/pkg/controller/cloud/nodecontroller.go +++ b/vendor/k8s.io/kubernetes/pkg/controller/cloud/nodecontroller.go @@ -189,7 +189,7 @@ func (cnc *CloudNodeController) updateNodeAddress(node *v1.Node, instances cloud if !nodeAddressesChangeDetected(node.Status.Addresses, newNode.Status.Addresses) { return } - _, err = nodeutil.PatchNodeStatus(cnc.kubeClient, types.NodeName(node.Name), node, newNode) + _, err = nodeutil.PatchNodeStatus(cnc.kubeClient.CoreV1(), types.NodeName(node.Name), node, newNode) if err != nil { glog.Errorf("Error patching node with cloud ip addresses = [%v]", err) } diff --git a/vendor/k8s.io/kubernetes/pkg/controller/resourcequota/BUILD b/vendor/k8s.io/kubernetes/pkg/controller/resourcequota/BUILD index d34cdab65eea..ce26a741afbc 100644 --- a/vendor/k8s.io/kubernetes/pkg/controller/resourcequota/BUILD +++ b/vendor/k8s.io/kubernetes/pkg/controller/resourcequota/BUILD @@ -35,6 +35,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library", @@ -62,6 +63,7 @@ go_test( "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/client-go/testing:go_default_library", diff --git a/vendor/k8s.io/kubernetes/pkg/controller/resourcequota/replenishment_controller.go b/vendor/k8s.io/kubernetes/pkg/controller/resourcequota/replenishment_controller.go index 8c0a6325c217..07820873371c 100644 --- a/vendor/k8s.io/kubernetes/pkg/controller/resourcequota/replenishment_controller.go +++ b/vendor/k8s.io/kubernetes/pkg/controller/resourcequota/replenishment_controller.go @@ -24,6 +24,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/clock" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/api" @@ -51,11 +52,11 @@ type ReplenishmentControllerOptions struct { } // PodReplenishmentUpdateFunc will replenish if the old pod was quota tracked but the new is not -func PodReplenishmentUpdateFunc(options *ReplenishmentControllerOptions) func(oldObj, newObj interface{}) { +func PodReplenishmentUpdateFunc(options *ReplenishmentControllerOptions, clock clock.Clock) func(oldObj, newObj interface{}) { return func(oldObj, newObj interface{}) { oldPod := oldObj.(*v1.Pod) newPod := newObj.(*v1.Pod) - if core.QuotaV1Pod(oldPod) && !core.QuotaV1Pod(newPod) { + if core.QuotaV1Pod(oldPod, clock) && !core.QuotaV1Pod(newPod, clock) { options.ReplenishmentFunc(options.GroupKind, newPod.Namespace, oldPod) } } @@ -115,9 +116,10 @@ func (r *replenishmentControllerFactory) NewController(options *ReplenishmentCon if err != nil { return nil, err } + clock := clock.RealClock{} informer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ - UpdateFunc: PodReplenishmentUpdateFunc(options), + UpdateFunc: PodReplenishmentUpdateFunc(options, clock), DeleteFunc: ObjectReplenishmentDeleteFunc(options), }, options.ResyncPeriod(), diff --git a/vendor/k8s.io/kubernetes/pkg/controller/resourcequota/replenishment_controller_test.go b/vendor/k8s.io/kubernetes/pkg/controller/resourcequota/replenishment_controller_test.go index 7a1224b29382..b4ffa4f19bd3 100644 --- a/vendor/k8s.io/kubernetes/pkg/controller/resourcequota/replenishment_controller_test.go +++ b/vendor/k8s.io/kubernetes/pkg/controller/resourcequota/replenishment_controller_test.go @@ -18,10 +18,12 @@ package resourcequota import ( "testing" + "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" @@ -55,7 +57,8 @@ func TestPodReplenishmentUpdateFunc(t *testing.T) { ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "pod"}, Status: v1.PodStatus{Phase: v1.PodFailed}, } - updateFunc := PodReplenishmentUpdateFunc(&options) + fakeClock := clock.NewFakeClock(time.Now()) + updateFunc := PodReplenishmentUpdateFunc(&options, fakeClock) updateFunc(oldPod, newPod) if mockReplenish.groupKind != api.Kind("Pod") { t.Errorf("Unexpected group kind %v", mockReplenish.groupKind) diff --git a/vendor/k8s.io/kubernetes/pkg/controller/statefulset/stateful_set_control.go b/vendor/k8s.io/kubernetes/pkg/controller/statefulset/stateful_set_control.go index 2d730bbebd8e..b5032d1da2dc 100644 --- a/vendor/k8s.io/kubernetes/pkg/controller/statefulset/stateful_set_control.go +++ b/vendor/k8s.io/kubernetes/pkg/controller/statefulset/stateful_set_control.go @@ -404,6 +404,21 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( replicas[i].Name) return &status, nil } + // Enforce the StatefulSet invariants - we do this without respect to the Pod's readiness so that the endpoints + // controller can be notified of identity changes if a Pod becomes unready due to a DNS inconsistency with respect + // to the Pods identity. + if !identityMatches(set, replicas[i]) || !storageMatches(set, replicas[i]) { + // Make a deep copy so we don't mutate the shared cache + copy, err := scheme.Scheme.DeepCopy(replicas[i]) + if err != nil { + return &status, err + } + replica := copy.(*v1.Pod) + if err := ssc.podControl.UpdateStatefulPod(updateSet, replica); err != nil { + return &status, err + } + } + // If we have a Pod that has been created but is not running and ready we can not make progress. // We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its // ordinal, are Running and Ready. @@ -415,19 +430,6 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( replicas[i].Name) return &status, nil } - // Enforce the StatefulSet invariants - if identityMatches(set, replicas[i]) && storageMatches(set, replicas[i]) { - continue - } - // Make a deep copy so we don't mutate the shared cache - copy, err := scheme.Scheme.DeepCopy(replicas[i]) - if err != nil { - return &status, err - } - replica := copy.(*v1.Pod) - if err := ssc.podControl.UpdateStatefulPod(updateSet, replica); err != nil { - return &status, err - } } // At this point, all of the current Replicas are Running and Ready, we can consider termination. diff --git a/vendor/k8s.io/kubernetes/pkg/controller/statefulset/stateful_set_utils.go b/vendor/k8s.io/kubernetes/pkg/controller/statefulset/stateful_set_utils.go index 7d240c6cd3e0..5cc0c2b4743b 100644 --- a/vendor/k8s.io/kubernetes/pkg/controller/statefulset/stateful_set_utils.go +++ b/vendor/k8s.io/kubernetes/pkg/controller/statefulset/stateful_set_utils.go @@ -116,7 +116,9 @@ func identityMatches(set *apps.StatefulSet, pod *v1.Pod) bool { return ordinal >= 0 && set.Name == parent && pod.Name == getPodName(set, ordinal) && - pod.Namespace == set.Namespace + pod.Namespace == set.Namespace && + pod.Spec.Hostname != "" && + (pod.Spec.Subdomain != "" || set.Spec.ServiceName == "") } // storageMatches returns true if pod's Volumes cover the set of PersistentVolumeClaims @@ -195,7 +197,12 @@ func initIdentity(set *apps.StatefulSet, pod *v1.Pod) { func updateIdentity(set *apps.StatefulSet, pod *v1.Pod) { pod.Name = getPodName(set, getOrdinal(pod)) pod.Namespace = set.Namespace - + if pod.Spec.Hostname == "" { + pod.Spec.Hostname = pod.Name + } + if pod.Spec.Subdomain == "" { + pod.Spec.Subdomain = set.Spec.ServiceName + } } // isRunningAndReady returns true if pod is in the PodRunning Phase, if it has a condition of PodReady, and if the init diff --git a/vendor/k8s.io/kubernetes/pkg/controller/statefulset/stateful_set_utils_test.go b/vendor/k8s.io/kubernetes/pkg/controller/statefulset/stateful_set_utils_test.go index 817078caf78a..497bb499fa6a 100644 --- a/vendor/k8s.io/kubernetes/pkg/controller/statefulset/stateful_set_utils_test.go +++ b/vendor/k8s.io/kubernetes/pkg/controller/statefulset/stateful_set_utils_test.go @@ -128,6 +128,22 @@ func TestUpdateIdentity(t *testing.T) { if !identityMatches(set, pod) { t.Error("updateIdentity failed to update the Pods namespace") } + pod.Spec.Hostname = "" + pod.Spec.Subdomain = "" + updateIdentity(set, pod) + if pod.Spec.Hostname != pod.Name || pod.Spec.Subdomain != set.Spec.ServiceName { + t.Errorf("want hostame=%s subdomain=%s got hostname=%s subdomain=%s", + pod.Name, + set.Spec.ServiceName, + pod.Spec.Hostname, + set.Spec.ServiceName) + } + pod.Spec.Hostname = "foo" + pod.Spec.Subdomain = "bar" + updateIdentity(set, pod) + if pod.Spec.Hostname != "foo" || pod.Spec.Subdomain != "bar" { + t.Errorf("want hostame=foo subdomain=bar got hostname=%s subdomain=%s", pod.Spec.Hostname, set.Spec.ServiceName) + } } func TestUpdateStorage(t *testing.T) { diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/BUILD b/vendor/k8s.io/kubernetes/pkg/kubelet/BUILD index f367db55f1dc..05898c6b1d1b 100644 --- a/vendor/k8s.io/kubernetes/pkg/kubelet/BUILD +++ b/vendor/k8s.io/kubernetes/pkg/kubelet/BUILD @@ -228,6 +228,7 @@ go_test( "//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/client-go/pkg/api/v1:go_default_library", + "//vendor/k8s.io/client-go/rest:go_default_library", "//vendor/k8s.io/client-go/testing:go_default_library", "//vendor/k8s.io/client-go/tools/record:go_default_library", "//vendor/k8s.io/client-go/util/flowcontrol:go_default_library", diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/eviction/BUILD b/vendor/k8s.io/kubernetes/pkg/kubelet/eviction/BUILD index 74278f18008a..6d68ccdf106c 100644 --- a/vendor/k8s.io/kubernetes/pkg/kubelet/eviction/BUILD +++ b/vendor/k8s.io/kubernetes/pkg/kubelet/eviction/BUILD @@ -59,7 +59,6 @@ go_library( library = ":cgo_codegen", tags = ["automanaged"], deps = [ - "//pkg/api:go_default_library", "//pkg/api/v1:go_default_library", "//pkg/api/v1/helper/qos:go_default_library", "//pkg/features:go_default_library", @@ -72,7 +71,6 @@ go_library( "//pkg/kubelet/server/stats:go_default_library", "//pkg/kubelet/types:go_default_library", "//pkg/kubelet/util/format:go_default_library", - "//pkg/quota/evaluator/core:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/eviction/helpers.go b/vendor/k8s.io/kubernetes/pkg/kubelet/eviction/helpers.go index 835054ed3ee9..5acd95624ecc 100644 --- a/vendor/k8s.io/kubernetes/pkg/kubelet/eviction/helpers.go +++ b/vendor/k8s.io/kubernetes/pkg/kubelet/eviction/helpers.go @@ -26,14 +26,12 @@ import ( "github.com/golang/glog" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" v1qos "k8s.io/kubernetes/pkg/api/v1/helper/qos" statsapi "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1" "k8s.io/kubernetes/pkg/kubelet/cm" evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api" "k8s.io/kubernetes/pkg/kubelet/server/stats" - "k8s.io/kubernetes/pkg/quota/evaluator/core" ) const ( @@ -581,19 +579,11 @@ func memory(stats statsFunc) cmpFunc { // adjust p1, p2 usage relative to the request (if any) p1Memory := p1Usage[v1.ResourceMemory] - p1Spec, err := core.PodUsageFunc(p1) - if err != nil { - return -1 - } - p1Request := p1Spec[api.ResourceRequestsMemory] + p1Request := podMemoryRequest(p1) p1Memory.Sub(p1Request) p2Memory := p2Usage[v1.ResourceMemory] - p2Spec, err := core.PodUsageFunc(p2) - if err != nil { - return 1 - } - p2Request := p2Spec[api.ResourceRequestsMemory] + p2Request := podMemoryRequest(p2) p2Memory.Sub(p2Request) // if p2 is using more than p1, we want p2 first @@ -601,6 +591,23 @@ func memory(stats statsFunc) cmpFunc { } } +// podMemoryRequest returns the total memory request of a pod which is the +// max(sum of init container requests, sum of container requests) +func podMemoryRequest(pod *v1.Pod) resource.Quantity { + containerValue := resource.Quantity{Format: resource.BinarySI} + for i := range pod.Spec.Containers { + containerValue.Add(*pod.Spec.Containers[i].Resources.Requests.Memory()) + } + initValue := resource.Quantity{Format: resource.BinarySI} + for i := range pod.Spec.InitContainers { + initValue.Add(*pod.Spec.InitContainers[i].Resources.Requests.Memory()) + } + if containerValue.Cmp(initValue) > 0 { + return containerValue + } + return initValue +} + // disk compares pods by largest consumer of disk relative to request for the specified disk resource. func disk(stats statsFunc, fsStatsToMeasure []fsStatsType, diskResource v1.ResourceName) cmpFunc { return func(p1, p2 *v1.Pod) int { diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/kubelet.go b/vendor/k8s.io/kubernetes/pkg/kubelet/kubelet.go index 0bebe417ffcc..96d180faf09b 100644 --- a/vendor/k8s.io/kubernetes/pkg/kubelet/kubelet.go +++ b/vendor/k8s.io/kubernetes/pkg/kubelet/kubelet.go @@ -57,6 +57,7 @@ import ( "k8s.io/kubernetes/pkg/apis/componentconfig" componentconfigv1alpha1 "k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + v1coregenerated "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/core/v1" corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/features" @@ -226,6 +227,7 @@ type KubeletDeps struct { ContainerManager cm.ContainerManager DockerClient libdocker.Interface EventClient v1core.EventsGetter + HeartbeatClient v1coregenerated.CoreV1Interface KubeClient clientset.Interface ExternalKubeClient clientgoclientset.Interface Mounter mount.Interface @@ -438,6 +440,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub hostname: hostname, nodeName: nodeName, kubeClient: kubeDeps.KubeClient, + heartbeatClient: kubeDeps.HeartbeatClient, rootDirectory: kubeCfg.RootDirectory, resyncInterval: kubeCfg.SyncFrequency.Duration, sourcesReady: config.NewSourcesReady(kubeDeps.PodConfig.SeenAllSources), @@ -836,12 +839,13 @@ type serviceLister interface { type Kubelet struct { kubeletConfiguration componentconfig.KubeletConfiguration - hostname string - nodeName types.NodeName - runtimeCache kubecontainer.RuntimeCache - kubeClient clientset.Interface - iptClient utilipt.Interface - rootDirectory string + hostname string + nodeName types.NodeName + runtimeCache kubecontainer.RuntimeCache + kubeClient clientset.Interface + heartbeatClient v1coregenerated.CoreV1Interface + iptClient utilipt.Interface + rootDirectory string // podWorkers handle syncing Pods in response to events. podWorkers PodWorkers diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/kubelet_node_status.go b/vendor/k8s.io/kubernetes/pkg/kubelet/kubelet_node_status.go index a07560715e6a..8f1b941e7834 100644 --- a/vendor/k8s.io/kubernetes/pkg/kubelet/kubelet_node_status.go +++ b/vendor/k8s.io/kubernetes/pkg/kubelet/kubelet_node_status.go @@ -137,7 +137,7 @@ func (kl *Kubelet) tryRegisterWithApiServer(node *v1.Node) bool { // annotation. requiresUpdate := kl.reconcileCMADAnnotationWithExistingNode(node, existingNode) if requiresUpdate { - if _, err := nodeutil.PatchNodeStatus(kl.kubeClient, types.NodeName(kl.nodeName), + if _, err := nodeutil.PatchNodeStatus(kl.kubeClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, existingNode); err != nil { glog.Errorf("Unable to reconcile node %q with API server: error updating node: %v", kl.nodeName, err) return false @@ -361,7 +361,7 @@ func (kl *Kubelet) initialNode() (*v1.Node, error) { // It synchronizes node status to master, registering the kubelet first if // necessary. func (kl *Kubelet) syncNodeStatus() { - if kl.kubeClient == nil { + if kl.kubeClient == nil || kl.heartbeatClient == nil { return } if kl.registerNode { @@ -398,7 +398,7 @@ func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error { if tryNumber == 0 { util.FromApiserverCache(&opts) } - node, err := kl.kubeClient.Core().Nodes().Get(string(kl.nodeName), opts) + node, err := kl.heartbeatClient.Nodes().Get(string(kl.nodeName), opts) if err != nil { return fmt.Errorf("error getting node %q: %v", kl.nodeName, err) } @@ -417,7 +417,7 @@ func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error { kl.setNodeStatus(node) // Patch the current status on the API server - updatedNode, err := nodeutil.PatchNodeStatus(kl.kubeClient, types.NodeName(kl.nodeName), originalNode, node) + updatedNode, err := nodeutil.PatchNodeStatus(kl.heartbeatClient, types.NodeName(kl.nodeName), originalNode, node) if err != nil { return err } diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/kubelet_node_status_test.go b/vendor/k8s.io/kubernetes/pkg/kubelet/kubelet_node_status_test.go index daf0c4278de2..213ef42e6786 100644 --- a/vendor/k8s.io/kubernetes/pkg/kubelet/kubelet_node_status_test.go +++ b/vendor/k8s.io/kubernetes/pkg/kubelet/kubelet_node_status_test.go @@ -19,9 +19,11 @@ package kubelet import ( "encoding/json" "fmt" + "net" goruntime "runtime" "sort" "strconv" + "sync/atomic" "testing" "time" @@ -40,9 +42,11 @@ import ( "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/rest" core "k8s.io/client-go/testing" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" + v1coregenerated "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/core/v1" "k8s.io/kubernetes/pkg/kubelet/cm" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/util/sliceutils" @@ -133,6 +137,7 @@ func TestUpdateNewNodeStatus(t *testing.T) { t, inputImageList, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet + kubelet.kubeClient = nil // ensure only the heartbeat client is used kubelet.containerManager = &localCM{ ContainerManager: cm.NewStubContainerManager(), allocatable: v1.ResourceList{ @@ -334,6 +339,7 @@ func TestUpdateExistingNodeStatus(t *testing.T) { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet + kubelet.kubeClient = nil // ensure only the heartbeat client is used kubelet.containerManager = &localCM{ ContainerManager: cm.NewStubContainerManager(), allocatable: v1.ResourceList{ @@ -685,10 +691,65 @@ func TestUpdateExistingNodeOutOfDiskStatusWithTransitionFrequency(t *testing.T) } } +func TestUpdateExistingNodeStatusTimeout(t *testing.T) { + attempts := int64(0) + + // set up a listener that hangs connections + ln, err := net.Listen("tcp", "127.0.0.1:0") + assert.NoError(t, err) + defer ln.Close() + go func() { + // accept connections and just let them hang + for { + _, err := ln.Accept() + if err != nil { + t.Log(err) + return + } + t.Log("accepted connection") + atomic.AddInt64(&attempts, 1) + } + }() + + config := &rest.Config{ + Host: "http://" + ln.Addr().String(), + QPS: -1, + Timeout: time.Second, + } + assert.NoError(t, err) + + testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) + defer testKubelet.Cleanup() + kubelet := testKubelet.kubelet + kubelet.kubeClient = nil // ensure only the heartbeat client is used + kubelet.heartbeatClient, err = v1coregenerated.NewForConfig(config) + + kubelet.containerManager = &localCM{ + ContainerManager: cm.NewStubContainerManager(), + allocatable: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(100E6, resource.BinarySI), + }, + capacity: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI), + }, + } + + // should return an error, but not hang + assert.Error(t, kubelet.updateNodeStatus()) + + // should have attempted multiple times + if actualAttempts := atomic.LoadInt64(&attempts); actualAttempts != nodeStatusUpdateRetry { + t.Errorf("Expected %d attempts, got %d", nodeStatusUpdateRetry, actualAttempts) + } +} + func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet + kubelet.kubeClient = nil // ensure only the heartbeat client is used kubelet.containerManager = &localCM{ ContainerManager: cm.NewStubContainerManager(), allocatable: v1.ResourceList{ @@ -900,6 +961,7 @@ func TestUpdateNodeStatusError(t *testing.T) { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet + kubelet.kubeClient = nil // ensure only the heartbeat client is used // No matching node for the kubelet testKubelet.fakeKubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{}}).ReactionChain assert.Error(t, kubelet.updateNodeStatus()) @@ -1149,6 +1211,7 @@ func TestUpdateNewNodeStatusTooLargeReservation(t *testing.T) { t, inputImageList, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet + kubelet.kubeClient = nil // ensure only the heartbeat client is used kubelet.containerManager = &localCM{ ContainerManager: cm.NewStubContainerManager(), allocatable: v1.ResourceList{ diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/kubelet_test.go b/vendor/k8s.io/kubernetes/pkg/kubelet/kubelet_test.go index 59000904a345..2bbab9211c14 100644 --- a/vendor/k8s.io/kubernetes/pkg/kubelet/kubelet_test.go +++ b/vendor/k8s.io/kubernetes/pkg/kubelet/kubelet_test.go @@ -154,6 +154,7 @@ func newTestKubeletWithImageList( kubelet := &Kubelet{} kubelet.recorder = fakeRecorder kubelet.kubeClient = fakeKubeClient + kubelet.heartbeatClient = fakeKubeClient.CoreV1() kubelet.os = &containertest.FakeOS{} kubelet.hostname = testKubeletHostname diff --git a/vendor/k8s.io/kubernetes/pkg/kubemark/hollow_kubelet.go b/vendor/k8s.io/kubernetes/pkg/kubemark/hollow_kubelet.go index 88da1ffcf6b1..6864e776feb4 100644 --- a/vendor/k8s.io/kubernetes/pkg/kubemark/hollow_kubelet.go +++ b/vendor/k8s.io/kubernetes/pkg/kubemark/hollow_kubelet.go @@ -68,6 +68,7 @@ func NewHollowKubelet( volumePlugins = append(volumePlugins, secret.ProbeVolumePlugins()...) d := &kubelet.KubeletDeps{ KubeClient: client, + HeartbeatClient: client.CoreV1(), DockerClient: dockerClient, CAdvisorInterface: cadvisorInterface, Cloud: nil, diff --git a/vendor/k8s.io/kubernetes/pkg/quota/evaluator/core/BUILD b/vendor/k8s.io/kubernetes/pkg/quota/evaluator/core/BUILD index 82c5d4897549..dcb537c75474 100644 --- a/vendor/k8s.io/kubernetes/pkg/quota/evaluator/core/BUILD +++ b/vendor/k8s.io/kubernetes/pkg/quota/evaluator/core/BUILD @@ -36,6 +36,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/validation/field:go_default_library", "//vendor/k8s.io/apiserver/pkg/admission:go_default_library", @@ -55,8 +56,10 @@ go_test( "//pkg/api:go_default_library", "//pkg/client/clientset_generated/clientset/fake:go_default_library", "//pkg/quota:go_default_library", + "//pkg/util/node:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library", ], ) diff --git a/vendor/k8s.io/kubernetes/pkg/quota/evaluator/core/pods.go b/vendor/k8s.io/kubernetes/pkg/quota/evaluator/core/pods.go index c9ea8aff2ee2..926fef4459e6 100644 --- a/vendor/k8s.io/kubernetes/pkg/quota/evaluator/core/pods.go +++ b/vendor/k8s.io/kubernetes/pkg/quota/evaluator/core/pods.go @@ -19,11 +19,14 @@ package core import ( "fmt" "strings" + "time" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + + "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/apiserver/pkg/admission" @@ -68,13 +71,14 @@ func listPodsByNamespaceFuncUsingClient(kubeClient clientset.Interface) generic. // NewPodEvaluator returns an evaluator that can evaluate pods // if the specified shared informer factory is not nil, evaluator may use it to support listing functions. -func NewPodEvaluator(kubeClient clientset.Interface, f informers.SharedInformerFactory) quota.Evaluator { +func NewPodEvaluator(kubeClient clientset.Interface, f informers.SharedInformerFactory, clock clock.Clock) quota.Evaluator { listFuncByNamespace := listPodsByNamespaceFuncUsingClient(kubeClient) if f != nil { listFuncByNamespace = generic.ListResourceUsingInformerFunc(f, v1.SchemeGroupVersion.WithResource("pods")) } return &podEvaluator{ listFuncByNamespace: listFuncByNamespace, + clock: clock, } } @@ -82,6 +86,8 @@ func NewPodEvaluator(kubeClient clientset.Interface, f informers.SharedInformerF type podEvaluator struct { // knows how to list pods listFuncByNamespace generic.ListFuncByNamespace + // used to track time + clock clock.Clock } // Constraints verifies that all required resources are present on the pod @@ -148,7 +154,8 @@ func (p *podEvaluator) MatchingResources(input []api.ResourceName) []api.Resourc // Usage knows how to measure usage associated with pods func (p *podEvaluator) Usage(item runtime.Object) (api.ResourceList, error) { - return PodUsageFunc(item) + // delegate to normal usage + return PodUsageFunc(item, p.clock) } // UsageStats calculates aggregate usage for the object. @@ -231,20 +238,22 @@ func podMatchesScopeFunc(scope api.ResourceQuotaScope, object runtime.Object) (b return false, nil } -// PodUsageFunc knows how to measure usage associated with pods -func PodUsageFunc(obj runtime.Object) (api.ResourceList, error) { +// PodUsageFunc returns the quota usage for a pod. +// A pod is charged for quota if the following are not true. +// - pod has a terminal phase (failed or succeeded) +// - pod has been marked for deletion and grace period has expired +func PodUsageFunc(obj runtime.Object, clock clock.Clock) (api.ResourceList, error) { pod, err := toInternalPodOrError(obj) if err != nil { return api.ResourceList{}, err } - // by convention, we do not quota pods that have reached an end-of-life state - if !QuotaPod(pod) { + // by convention, we do not quota pods that have reached end-of life + if !QuotaPod(pod, clock) { return api.ResourceList{}, nil } requests := api.ResourceList{} limits := api.ResourceList{} - // TODO: fix this when we have pod level cgroups - // when we have pod level cgroups, we can just read pod level requests/limits + // TODO: ideally, we have pod level requests and limits in the future. for i := range pod.Spec.Containers { requests = quota.Add(requests, pod.Spec.Containers[i].Resources.Requests) limits = quota.Add(limits, pod.Spec.Containers[i].Resources.Limits) @@ -272,12 +281,47 @@ func isTerminating(pod *api.Pod) bool { } // QuotaPod returns true if the pod is eligible to track against a quota -func QuotaPod(pod *api.Pod) bool { - return !(api.PodFailed == pod.Status.Phase || api.PodSucceeded == pod.Status.Phase) +// A pod is eligible for quota, unless any of the following are true: +// - pod has a terminal phase (failed or succeeded) +// - pod has been marked for deletion and grace period has expired. +func QuotaPod(pod *api.Pod, clock clock.Clock) bool { + // if pod is terminal, ignore it for quota + if api.PodFailed == pod.Status.Phase || api.PodSucceeded == pod.Status.Phase { + return false + } + // deleted pods that should be gone should not be charged to user quota. + // this can happen if a node is lost, and the kubelet is never able to confirm deletion. + // even though the cluster may have drifting clocks, quota makes a reasonable effort + // to balance cluster needs against user needs. user's do not control clocks, + // but at worst a small drive in clocks will only slightly impact quota. + if pod.DeletionTimestamp != nil && pod.DeletionGracePeriodSeconds != nil { + now := clock.Now() + deletionTime := pod.DeletionTimestamp.Time + gracePeriod := time.Duration(*pod.DeletionGracePeriodSeconds) * time.Second + if now.After(deletionTime.Add(gracePeriod)) { + return false + } + } + return true } // QuotaV1Pod returns true if the pod is eligible to track against a quota // if it's not in a terminal state according to its phase. -func QuotaV1Pod(pod *v1.Pod) bool { - return !(v1.PodFailed == pod.Status.Phase || v1.PodSucceeded == pod.Status.Phase) +func QuotaV1Pod(pod *v1.Pod, clock clock.Clock) bool { + // if pod is terminal, ignore it for quota + if v1.PodFailed == pod.Status.Phase || v1.PodSucceeded == pod.Status.Phase { + return false + } + // if pods are stuck terminating (for example, a node is lost), we do not want + // to charge the user for that pod in quota because it could prevent them from + // scaling up new pods to service their application. + if pod.DeletionTimestamp != nil && pod.DeletionGracePeriodSeconds != nil { + now := clock.Now() + deletionTime := pod.DeletionTimestamp.Time + gracePeriod := time.Duration(*pod.DeletionGracePeriodSeconds) * time.Second + if now.After(deletionTime.Add(gracePeriod)) { + return false + } + } + return true } diff --git a/vendor/k8s.io/kubernetes/pkg/quota/evaluator/core/pods_test.go b/vendor/k8s.io/kubernetes/pkg/quota/evaluator/core/pods_test.go index 5c64630950aa..a6fa5c2a99df 100644 --- a/vendor/k8s.io/kubernetes/pkg/quota/evaluator/core/pods_test.go +++ b/vendor/k8s.io/kubernetes/pkg/quota/evaluator/core/pods_test.go @@ -18,11 +18,15 @@ package core import ( "testing" + "time" "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/clock" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" "k8s.io/kubernetes/pkg/quota" + "k8s.io/kubernetes/pkg/util/node" ) func TestPodConstraintsFunc(t *testing.T) { @@ -87,7 +91,7 @@ func TestPodConstraintsFunc(t *testing.T) { }, } kubeClient := fake.NewSimpleClientset() - evaluator := NewPodEvaluator(kubeClient, nil) + evaluator := NewPodEvaluator(kubeClient, nil, clock.RealClock{}) for testName, test := range testCases { err := evaluator.Constraints(test.required, test.pod) switch { @@ -101,7 +105,16 @@ func TestPodConstraintsFunc(t *testing.T) { func TestPodEvaluatorUsage(t *testing.T) { kubeClient := fake.NewSimpleClientset() - evaluator := NewPodEvaluator(kubeClient, nil) + fakeClock := clock.NewFakeClock(time.Now()) + evaluator := NewPodEvaluator(kubeClient, nil, fakeClock) + + // fields use to simulate a pod undergoing termination + // note: we set the deletion time in the past + now := fakeClock.Now() + terminationGracePeriodSeconds := int64(30) + deletionTimestampPastGracePeriod := metav1.NewTime(now.Add(time.Duration(terminationGracePeriodSeconds) * time.Second * time.Duration(-2))) + deletionTimestampNotPastGracePeriod := metav1.NewTime(fakeClock.Now()) + testCases := map[string]struct { pod *api.Pod usage api.ResourceList @@ -245,6 +258,66 @@ func TestPodEvaluatorUsage(t *testing.T) { api.ResourceMemory: resource.MustParse("100M"), }, }, + "pod deletion timestamp exceeded": { + pod: &api.Pod{ + ObjectMeta: metav1.ObjectMeta{ + DeletionTimestamp: &deletionTimestampPastGracePeriod, + DeletionGracePeriodSeconds: &terminationGracePeriodSeconds, + }, + Status: api.PodStatus{ + Reason: node.NodeUnreachablePodReason, + }, + Spec: api.PodSpec{ + TerminationGracePeriodSeconds: &terminationGracePeriodSeconds, + Containers: []api.Container{ + { + Resources: api.ResourceRequirements{ + Requests: api.ResourceList{ + api.ResourceCPU: resource.MustParse("1"), + api.ResourceMemory: resource.MustParse("50M"), + }, + Limits: api.ResourceList{ + api.ResourceCPU: resource.MustParse("2"), + api.ResourceMemory: resource.MustParse("100M"), + }, + }, + }, + }, + }, + }, + usage: api.ResourceList{}, + }, + "pod deletion timestamp not exceeded": { + pod: &api.Pod{ + ObjectMeta: metav1.ObjectMeta{ + DeletionTimestamp: &deletionTimestampNotPastGracePeriod, + DeletionGracePeriodSeconds: &terminationGracePeriodSeconds, + }, + Status: api.PodStatus{ + Reason: node.NodeUnreachablePodReason, + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Resources: api.ResourceRequirements{ + Requests: api.ResourceList{ + api.ResourceCPU: resource.MustParse("1"), + }, + Limits: api.ResourceList{ + api.ResourceCPU: resource.MustParse("2"), + }, + }, + }, + }, + }, + }, + usage: api.ResourceList{ + api.ResourceRequestsCPU: resource.MustParse("1"), + api.ResourceLimitsCPU: resource.MustParse("2"), + api.ResourcePods: resource.MustParse("1"), + api.ResourceCPU: resource.MustParse("1"), + }, + }, } for testName, testCase := range testCases { actual, err := evaluator.Usage(testCase.pod) diff --git a/vendor/k8s.io/kubernetes/pkg/quota/evaluator/core/registry.go b/vendor/k8s.io/kubernetes/pkg/quota/evaluator/core/registry.go index 9276ef332812..bf370ed9d357 100644 --- a/vendor/k8s.io/kubernetes/pkg/quota/evaluator/core/registry.go +++ b/vendor/k8s.io/kubernetes/pkg/quota/evaluator/core/registry.go @@ -18,6 +18,7 @@ package core import ( "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/clock" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions" "k8s.io/kubernetes/pkg/quota" @@ -27,7 +28,7 @@ import ( // NewRegistry returns a registry that knows how to deal with core kubernetes resources // If an informer factory is provided, evaluators will use them. func NewRegistry(kubeClient clientset.Interface, f informers.SharedInformerFactory) quota.Registry { - pod := NewPodEvaluator(kubeClient, f) + pod := NewPodEvaluator(kubeClient, f, clock.RealClock{}) service := NewServiceEvaluator(kubeClient, f) replicationController := NewReplicationControllerEvaluator(kubeClient, f) resourceQuota := NewResourceQuotaEvaluator(kubeClient, f) diff --git a/vendor/k8s.io/kubernetes/pkg/registry/core/node/rest/BUILD b/vendor/k8s.io/kubernetes/pkg/registry/core/node/rest/BUILD index 9fa78afc5db4..f889f78350dc 100644 --- a/vendor/k8s.io/kubernetes/pkg/registry/core/node/rest/BUILD +++ b/vendor/k8s.io/kubernetes/pkg/registry/core/node/rest/BUILD @@ -17,6 +17,7 @@ go_library( "//pkg/kubelet/client:go_default_library", "//pkg/registry/core/node:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library", "//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library", "//vendor/k8s.io/apiserver/pkg/registry/generic/registry:go_default_library", "//vendor/k8s.io/apiserver/pkg/registry/generic/rest:go_default_library", diff --git a/vendor/k8s.io/kubernetes/pkg/registry/core/node/rest/proxy.go b/vendor/k8s.io/kubernetes/pkg/registry/core/node/rest/proxy.go index 627ec716ccb8..ca4125832a97 100644 --- a/vendor/k8s.io/kubernetes/pkg/registry/core/node/rest/proxy.go +++ b/vendor/k8s.io/kubernetes/pkg/registry/core/node/rest/proxy.go @@ -20,9 +20,9 @@ import ( "fmt" "net/http" "net/url" - "path" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/net" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" genericregistry "k8s.io/apiserver/pkg/registry/generic/registry" genericrest "k8s.io/apiserver/pkg/registry/generic/rest" @@ -70,7 +70,7 @@ func (r *ProxyREST) Connect(ctx genericapirequest.Context, id string, opts runti if err != nil { return nil, err } - location.Path = path.Join("/", location.Path, proxyOpts.Path) + location.Path = net.JoinPreservingTrailingSlash(location.Path, proxyOpts.Path) // Return a proxy handler that uses the desired transport, wrapped with additional proxy handling (to get URL rewriting, X-Forwarded-* headers, etc) return newThrottledUpgradeAwareProxyHandler(location, transport, true, false, responder), nil } diff --git a/vendor/k8s.io/kubernetes/pkg/registry/core/pod/rest/BUILD b/vendor/k8s.io/kubernetes/pkg/registry/core/pod/rest/BUILD index 79a450de886d..3229282bea57 100644 --- a/vendor/k8s.io/kubernetes/pkg/registry/core/pod/rest/BUILD +++ b/vendor/k8s.io/kubernetes/pkg/registry/core/pod/rest/BUILD @@ -23,6 +23,7 @@ go_library( "//pkg/registry/core/pod:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library", "//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library", "//vendor/k8s.io/apiserver/pkg/registry/generic/registry:go_default_library", "//vendor/k8s.io/apiserver/pkg/registry/generic/rest:go_default_library", diff --git a/vendor/k8s.io/kubernetes/pkg/registry/core/pod/rest/subresources.go b/vendor/k8s.io/kubernetes/pkg/registry/core/pod/rest/subresources.go index 3b54089a4604..5436ce19903e 100644 --- a/vendor/k8s.io/kubernetes/pkg/registry/core/pod/rest/subresources.go +++ b/vendor/k8s.io/kubernetes/pkg/registry/core/pod/rest/subresources.go @@ -20,9 +20,9 @@ import ( "fmt" "net/http" "net/url" - "path" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/net" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" genericregistry "k8s.io/apiserver/pkg/registry/generic/registry" genericrest "k8s.io/apiserver/pkg/registry/generic/rest" @@ -69,7 +69,7 @@ func (r *ProxyREST) Connect(ctx genericapirequest.Context, id string, opts runti if err != nil { return nil, err } - location.Path = path.Join("/", location.Path, proxyOpts.Path) + location.Path = net.JoinPreservingTrailingSlash(location.Path, proxyOpts.Path) // Return a proxy handler that uses the desired transport, wrapped with additional proxy handling (to get URL rewriting, X-Forwarded-* headers, etc) return newThrottledUpgradeAwareProxyHandler(location, transport, true, false, false, responder), nil } diff --git a/vendor/k8s.io/kubernetes/pkg/registry/core/service/proxy.go b/vendor/k8s.io/kubernetes/pkg/registry/core/service/proxy.go index e4fc0d4e4d9a..754a74cce410 100644 --- a/vendor/k8s.io/kubernetes/pkg/registry/core/service/proxy.go +++ b/vendor/k8s.io/kubernetes/pkg/registry/core/service/proxy.go @@ -20,9 +20,9 @@ import ( "fmt" "net/http" "net/url" - "path" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/net" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" genericrest "k8s.io/apiserver/pkg/registry/generic/rest" "k8s.io/apiserver/pkg/registry/rest" @@ -66,7 +66,7 @@ func (r *ProxyREST) Connect(ctx genericapirequest.Context, id string, opts runti if err != nil { return nil, err } - location.Path = path.Join("/", location.Path, proxyOpts.Path) + location.Path = net.JoinPreservingTrailingSlash(location.Path, proxyOpts.Path) // Return a proxy handler that uses the desired transport, wrapped with additional proxy handling (to get URL rewriting, X-Forwarded-* headers, etc) return newThrottledUpgradeAwareProxyHandler(location, transport, true, false, responder), nil } diff --git a/vendor/k8s.io/kubernetes/pkg/util/node/BUILD b/vendor/k8s.io/kubernetes/pkg/util/node/BUILD index f7533c796392..e6ee8274dd2f 100644 --- a/vendor/k8s.io/kubernetes/pkg/util/node/BUILD +++ b/vendor/k8s.io/kubernetes/pkg/util/node/BUILD @@ -16,6 +16,7 @@ go_library( "//pkg/api:go_default_library", "//pkg/api/v1:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", + "//pkg/client/clientset_generated/clientset/typed/core/v1:go_default_library", "//pkg/kubelet/apis:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", diff --git a/vendor/k8s.io/kubernetes/pkg/util/node/node.go b/vendor/k8s.io/kubernetes/pkg/util/node/node.go index a351667bf28b..15c6cd32f64b 100644 --- a/vendor/k8s.io/kubernetes/pkg/util/node/node.go +++ b/vendor/k8s.io/kubernetes/pkg/util/node/node.go @@ -31,6 +31,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + v1core "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/core/v1" kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" ) @@ -149,7 +150,7 @@ func SetNodeCondition(c clientset.Interface, node types.NodeName, condition v1.N } // PatchNodeStatus patches node status. -func PatchNodeStatus(c clientset.Interface, nodeName types.NodeName, oldNode *v1.Node, newNode *v1.Node) (*v1.Node, error) { +func PatchNodeStatus(c v1core.CoreV1Interface, nodeName types.NodeName, oldNode *v1.Node, newNode *v1.Node) (*v1.Node, error) { oldData, err := json.Marshal(oldNode) if err != nil { return nil, fmt.Errorf("failed to marshal old node %#v for node %q: %v", oldNode, nodeName, err) @@ -170,7 +171,7 @@ func PatchNodeStatus(c clientset.Interface, nodeName types.NodeName, oldNode *v1 return nil, fmt.Errorf("failed to create patch for node %q: %v", nodeName, err) } - updatedNode, err := c.Core().Nodes().Patch(string(nodeName), types.StrategicMergePatchType, patchBytes, "status") + updatedNode, err := c.Nodes().Patch(string(nodeName), types.StrategicMergePatchType, patchBytes, "status") if err != nil { return nil, fmt.Errorf("failed to patch status %q for node %q: %v", patchBytes, nodeName, err) } diff --git a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apimachinery/pkg/api/meta/restmapper.go b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apimachinery/pkg/api/meta/restmapper.go index 4b86f64162f2..f81d47feee17 100644 --- a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apimachinery/pkg/api/meta/restmapper.go +++ b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apimachinery/pkg/api/meta/restmapper.go @@ -117,7 +117,10 @@ func NewDefaultRESTMapper(defaultGroupVersions []schema.GroupVersion, f VersionI func (m *DefaultRESTMapper) Add(kind schema.GroupVersionKind, scope RESTScope) { plural, singular := UnsafeGuessKindToResource(kind) + m.AddSpecific(kind, plural, singular, scope) +} +func (m *DefaultRESTMapper) AddSpecific(kind schema.GroupVersionKind, plural, singular schema.GroupVersionResource, scope RESTScope) { m.singularToPlural[singular] = plural m.pluralToSingular[plural] = singular diff --git a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apimachinery/pkg/util/net/http.go b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apimachinery/pkg/util/net/http.go index adb80813be2b..edf77dc0966f 100644 --- a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apimachinery/pkg/util/net/http.go +++ b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apimachinery/pkg/util/net/http.go @@ -26,6 +26,7 @@ import ( "net/http" "net/url" "os" + "path" "strconv" "strings" @@ -33,6 +34,26 @@ import ( "golang.org/x/net/http2" ) +// JoinPreservingTrailingSlash does a path.Join of the specified elements, +// preserving any trailing slash on the last non-empty segment +func JoinPreservingTrailingSlash(elem ...string) string { + // do the basic path join + result := path.Join(elem...) + + // find the last non-empty segment + for i := len(elem) - 1; i >= 0; i-- { + if len(elem[i]) > 0 { + // if the last segment ended in a slash, ensure our result does as well + if strings.HasSuffix(elem[i], "/") && !strings.HasSuffix(result, "/") { + result += "/" + } + break + } + } + + return result +} + // IsProbableEOF returns true if the given error resembles a connection termination // scenario that would justify assuming that the watch is empty. // These errors are what the Go http stack returns back to us which are general diff --git a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apimachinery/pkg/util/net/http_test.go b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apimachinery/pkg/util/net/http_test.go index 2d41eda49d72..8f5dd9cdffae 100644 --- a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apimachinery/pkg/util/net/http_test.go +++ b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apimachinery/pkg/util/net/http_test.go @@ -20,6 +20,7 @@ package net import ( "crypto/tls" + "fmt" "net" "net/http" "net/url" @@ -218,3 +219,40 @@ func TestTLSClientConfigHolder(t *testing.T) { t.Errorf("didn't find tls config") } } + +func TestJoinPreservingTrailingSlash(t *testing.T) { + tests := []struct { + a string + b string + want string + }{ + // All empty + {"", "", ""}, + + // Empty a + {"", "/", "/"}, + {"", "foo", "foo"}, + {"", "/foo", "/foo"}, + {"", "/foo/", "/foo/"}, + + // Empty b + {"/", "", "/"}, + {"foo", "", "foo"}, + {"/foo", "", "/foo"}, + {"/foo/", "", "/foo/"}, + + // Both populated + {"/", "/", "/"}, + {"foo", "foo", "foo/foo"}, + {"/foo", "/foo", "/foo/foo"}, + {"/foo/", "/foo/", "/foo/foo/"}, + } + for _, tt := range tests { + name := fmt.Sprintf("%q+%q=%q", tt.a, tt.b, tt.want) + t.Run(name, func(t *testing.T) { + if got := JoinPreservingTrailingSlash(tt.a, tt.b); got != tt.want { + t.Errorf("JoinPreservingTrailingSlash() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go index 0c4730dba17d..880546abb0fa 100644 --- a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go +++ b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go @@ -1953,15 +1953,25 @@ func TestGetWithOptions(t *testing.T) { requestURL: "/namespaces/default/simple/id?param1=test1¶m2=test2", expectedPath: "", }, + { + name: "with root slash", + requestURL: "/namespaces/default/simple/id/?param1=test1¶m2=test2", + expectedPath: "/", + }, { name: "with path", requestURL: "/namespaces/default/simple/id/a/different/path?param1=test1¶m2=test2", - expectedPath: "a/different/path", + expectedPath: "/a/different/path", + }, + { + name: "with path with trailing slash", + requestURL: "/namespaces/default/simple/id/a/different/path/?param1=test1¶m2=test2", + expectedPath: "/a/different/path/", }, { name: "as subresource", requestURL: "/namespaces/default/simple/id/subresource/another/different/path?param1=test1¶m2=test2", - expectedPath: "another/different/path", + expectedPath: "/another/different/path", }, { name: "cluster-scoped basic", @@ -1973,13 +1983,13 @@ func TestGetWithOptions(t *testing.T) { name: "cluster-scoped basic with path", rootScoped: true, requestURL: "/simple/id/a/cluster/path?param1=test1¶m2=test2", - expectedPath: "a/cluster/path", + expectedPath: "/a/cluster/path", }, { name: "cluster-scoped basic as subresource", rootScoped: true, requestURL: "/simple/id/subresource/another/cluster/path?param1=test1¶m2=test2", - expectedPath: "another/cluster/path", + expectedPath: "/another/cluster/path", }, } @@ -2374,7 +2384,7 @@ func TestConnectWithOptions(t *testing.T) { func TestConnectWithOptionsAndPath(t *testing.T) { responseText := "Hello World" itemID := "theID" - testPath := "a/b/c/def" + testPath := "/a/b/c/def" connectStorage := &ConnecterRESTStorage{ connectHandler: &OutputConnect{ response: responseText, @@ -2390,7 +2400,7 @@ func TestConnectWithOptionsAndPath(t *testing.T) { server := httptest.NewServer(handler) defer server.Close() - resp, err := http.Get(server.URL + "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/default/simple/" + itemID + "/connect/" + testPath + "?param1=value1¶m2=value2") + resp, err := http.Get(server.URL + "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/default/simple/" + itemID + "/connect" + testPath + "?param1=value1¶m2=value2") if err != nil { t.Errorf("unexpected error: %v", err) diff --git a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest.go b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest.go index f238d447d326..e23be1ef411c 100644 --- a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest.go +++ b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest.go @@ -212,7 +212,20 @@ func getRequestOptions(req *http.Request, scope RequestScope, into runtime.Objec if isSubresource { startingIndex = 3 } - newQuery[subpathKey] = []string{strings.Join(requestInfo.Parts[startingIndex:], "/")} + + p := strings.Join(requestInfo.Parts[startingIndex:], "/") + + // ensure non-empty subpaths correctly reflect a leading slash + if len(p) > 0 && !strings.HasPrefix(p, "/") { + p = "/" + p + } + + // ensure subpaths correctly reflect the presence of a trailing slash on the original request + if strings.HasSuffix(requestInfo.Path, "/") && !strings.HasSuffix(p, "/") { + p += "/" + } + + newQuery[subpathKey] = []string{p} query = newQuery } return scope.ParameterCodec.DecodeParameters(query, scope.Kind.GroupVersion(), into) diff --git a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index 708018cd49b3..bd3b94648879 100644 --- a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -29,6 +29,7 @@ import ( "github.com/golang/glog" "golang.org/x/net/context" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/conversion" "k8s.io/apimachinery/pkg/runtime" @@ -263,6 +264,14 @@ func (s *store) GuaranteedUpdate( } key = path.Join(s.pathPrefix, key) + getCurrentState := func() (*objState, error) { + getResp, err := s.client.KV.Get(ctx, key, s.getOps...) + if err != nil { + return nil, err + } + return s.getState(getResp, key, v, ignoreNotFound) + } + var origState *objState var mustCheckData bool if len(suggestion) == 1 && suggestion[0] != nil { @@ -272,11 +281,7 @@ func (s *store) GuaranteedUpdate( } mustCheckData = true } else { - getResp, err := s.client.KV.Get(ctx, key, s.getOps...) - if err != nil { - return err - } - origState, err = s.getState(getResp, key, v, ignoreNotFound) + origState, err = getCurrentState() if err != nil { return err } @@ -291,6 +296,18 @@ func (s *store) GuaranteedUpdate( ret, ttl, err := s.updateState(origState, tryUpdate) if err != nil { + // It's possible we were working with stale data + if mustCheckData && apierrors.IsConflict(err) { + // Actually fetch + origState, err = getCurrentState() + if err != nil { + return err + } + mustCheckData = false + // Retry + continue + } + return err } @@ -303,11 +320,7 @@ func (s *store) GuaranteedUpdate( // etcd in order to be sure the data in the store is equivalent to // our desired serialization if mustCheckData { - getResp, err := s.client.KV.Get(ctx, key, s.getOps...) - if err != nil { - return err - } - origState, err = s.getState(getResp, key, v, ignoreNotFound) + origState, err = getCurrentState() if err != nil { return err } diff --git a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go new file mode 100644 index 000000000000..f9ad9e296461 --- /dev/null +++ b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -0,0 +1,800 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcd3 + +import ( + "bytes" + "errors" + "fmt" + "reflect" + "strconv" + "sync" + "testing" + + "github.com/coreos/etcd/integration" + "golang.org/x/net/context" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + apitesting "k8s.io/apimachinery/pkg/api/testing" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/util/diff" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/apis/example" + examplev1 "k8s.io/apiserver/pkg/apis/example/v1" + "k8s.io/apiserver/pkg/storage" + storagetests "k8s.io/apiserver/pkg/storage/tests" + "k8s.io/apiserver/pkg/storage/value" +) + +var scheme = runtime.NewScheme() +var codecs = serializer.NewCodecFactory(scheme) + +func init() { + metav1.AddToGroupVersion(scheme, metav1.SchemeGroupVersion) + example.AddToScheme(scheme) + examplev1.AddToScheme(scheme) +} + +// prefixTransformer adds and verifies that all data has the correct prefix on its way in and out. +type prefixTransformer struct { + prefix []byte + stale bool + err error +} + +func (p prefixTransformer) TransformFromStorage(b []byte, ctx value.Context) ([]byte, bool, error) { + if ctx == nil { + panic("no context provided") + } + if !bytes.HasPrefix(b, p.prefix) { + return nil, false, fmt.Errorf("value does not have expected prefix: %s", string(b)) + } + return bytes.TrimPrefix(b, p.prefix), p.stale, p.err +} +func (p prefixTransformer) TransformToStorage(b []byte, ctx value.Context) ([]byte, error) { + if ctx == nil { + panic("no context provided") + } + if len(b) > 0 { + return append(append([]byte{}, p.prefix...), b...), p.err + } + return b, p.err +} + +func TestCreate(t *testing.T) { + ctx, store, cluster := testSetup(t) + defer cluster.Terminate(t) + etcdClient := cluster.RandClient() + + key := "/testkey" + out := &example.Pod{} + obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} + + // verify that kv pair is empty before set + getResp, err := etcdClient.KV.Get(ctx, key) + if err != nil { + t.Fatalf("etcdClient.KV.Get failed: %v", err) + } + if len(getResp.Kvs) != 0 { + t.Fatalf("expecting empty result on key: %s", key) + } + + err = store.Create(ctx, key, obj, out, 0) + if err != nil { + t.Fatalf("Set failed: %v", err) + } + // basic tests of the output + if obj.ObjectMeta.Name != out.ObjectMeta.Name { + t.Errorf("pod name want=%s, get=%s", obj.ObjectMeta.Name, out.ObjectMeta.Name) + } + if out.ResourceVersion == "" { + t.Errorf("output should have non-empty resource version") + } + + // verify that kv pair is not empty after set + getResp, err = etcdClient.KV.Get(ctx, key) + if err != nil { + t.Fatalf("etcdClient.KV.Get failed: %v", err) + } + if len(getResp.Kvs) == 0 { + t.Fatalf("expecting non empty result on key: %s", key) + } +} + +func TestCreateWithTTL(t *testing.T) { + ctx, store, cluster := testSetup(t) + defer cluster.Terminate(t) + + input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} + key := "/somekey" + + out := &example.Pod{} + if err := store.Create(ctx, key, input, out, 1); err != nil { + t.Fatalf("Create failed: %v", err) + } + + w, err := store.Watch(ctx, key, out.ResourceVersion, storage.Everything) + if err != nil { + t.Fatalf("Watch failed: %v", err) + } + testCheckEventType(t, watch.Deleted, w) +} + +func TestCreateWithKeyExist(t *testing.T) { + ctx, store, cluster := testSetup(t) + defer cluster.Terminate(t) + obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} + key, _ := testPropogateStore(ctx, t, store, obj) + out := &example.Pod{} + err := store.Create(ctx, key, obj, out, 0) + if err == nil || !storage.IsNodeExist(err) { + t.Errorf("expecting key exists error, but get: %s", err) + } +} + +func TestGet(t *testing.T) { + ctx, store, cluster := testSetup(t) + defer cluster.Terminate(t) + key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) + + tests := []struct { + key string + ignoreNotFound bool + expectNotFoundErr bool + expectedOut *example.Pod + }{{ // test get on existing item + key: key, + ignoreNotFound: false, + expectNotFoundErr: false, + expectedOut: storedObj, + }, { // test get on non-existing item with ignoreNotFound=false + key: "/non-existing", + ignoreNotFound: false, + expectNotFoundErr: true, + }, { // test get on non-existing item with ignoreNotFound=true + key: "/non-existing", + ignoreNotFound: true, + expectNotFoundErr: false, + expectedOut: &example.Pod{}, + }} + + for i, tt := range tests { + out := &example.Pod{} + err := store.Get(ctx, tt.key, "", out, tt.ignoreNotFound) + if tt.expectNotFoundErr { + if err == nil || !storage.IsNotFound(err) { + t.Errorf("#%d: expecting not found error, but get: %s", i, err) + } + continue + } + if err != nil { + t.Fatalf("Get failed: %v", err) + } + if !reflect.DeepEqual(tt.expectedOut, out) { + t.Errorf("#%d: pod want=%#v, get=%#v", i, tt.expectedOut, out) + } + } +} + +func TestUnconditionalDelete(t *testing.T) { + ctx, store, cluster := testSetup(t) + defer cluster.Terminate(t) + key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) + + tests := []struct { + key string + expectedObj *example.Pod + expectNotFoundErr bool + }{{ // test unconditional delete on existing key + key: key, + expectedObj: storedObj, + expectNotFoundErr: false, + }, { // test unconditional delete on non-existing key + key: "/non-existing", + expectedObj: nil, + expectNotFoundErr: true, + }} + + for i, tt := range tests { + out := &example.Pod{} // reset + err := store.Delete(ctx, tt.key, out, nil) + if tt.expectNotFoundErr { + if err == nil || !storage.IsNotFound(err) { + t.Errorf("#%d: expecting not found error, but get: %s", i, err) + } + continue + } + if err != nil { + t.Fatalf("Delete failed: %v", err) + } + if !reflect.DeepEqual(tt.expectedObj, out) { + t.Errorf("#%d: pod want=%#v, get=%#v", i, tt.expectedObj, out) + } + } +} + +func TestConditionalDelete(t *testing.T) { + ctx, store, cluster := testSetup(t) + defer cluster.Terminate(t) + key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}}) + + tests := []struct { + precondition *storage.Preconditions + expectInvalidObjErr bool + }{{ // test conditional delete with UID match + precondition: storage.NewUIDPreconditions("A"), + expectInvalidObjErr: false, + }, { // test conditional delete with UID mismatch + precondition: storage.NewUIDPreconditions("B"), + expectInvalidObjErr: true, + }} + + for i, tt := range tests { + out := &example.Pod{} + err := store.Delete(ctx, key, out, tt.precondition) + if tt.expectInvalidObjErr { + if err == nil || !storage.IsInvalidObj(err) { + t.Errorf("#%d: expecting invalid UID error, but get: %s", i, err) + } + continue + } + if err != nil { + t.Fatalf("Delete failed: %v", err) + } + if !reflect.DeepEqual(storedObj, out) { + t.Errorf("#%d: pod want=%#v, get=%#v", i, storedObj, out) + } + key, storedObj = testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}}) + } +} + +func TestGetToList(t *testing.T) { + ctx, store, cluster := testSetup(t) + defer cluster.Terminate(t) + key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) + + tests := []struct { + key string + pred storage.SelectionPredicate + expectedOut []*example.Pod + }{{ // test GetToList on existing key + key: key, + pred: storage.Everything, + expectedOut: []*example.Pod{storedObj}, + }, { // test GetToList on non-existing key + key: "/non-existing", + pred: storage.Everything, + expectedOut: nil, + }, { // test GetToList with matching pod name + key: "/non-existing", + pred: storage.SelectionPredicate{ + Label: labels.Everything(), + Field: fields.ParseSelectorOrDie("metadata.name!=" + storedObj.Name), + GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { + pod := obj.(*example.Pod) + return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil + }, + }, + expectedOut: nil, + }} + + for i, tt := range tests { + out := &example.PodList{} + err := store.GetToList(ctx, tt.key, "", tt.pred, out) + if err != nil { + t.Fatalf("GetToList failed: %v", err) + } + if len(out.Items) != len(tt.expectedOut) { + t.Errorf("#%d: length of list want=%d, get=%d", i, len(tt.expectedOut), len(out.Items)) + continue + } + for j, wantPod := range tt.expectedOut { + getPod := &out.Items[j] + if !reflect.DeepEqual(wantPod, getPod) { + t.Errorf("#%d: pod want=%#v, get=%#v", i, wantPod, getPod) + } + } + } +} + +func TestGuaranteedUpdate(t *testing.T) { + ctx, store, cluster := testSetup(t) + defer cluster.Terminate(t) + key, storeObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}}) + + tests := []struct { + key string + ignoreNotFound bool + precondition *storage.Preconditions + expectNotFoundErr bool + expectInvalidObjErr bool + expectNoUpdate bool + transformStale bool + }{{ // GuaranteedUpdate on non-existing key with ignoreNotFound=false + key: "/non-existing", + ignoreNotFound: false, + precondition: nil, + expectNotFoundErr: true, + expectInvalidObjErr: false, + expectNoUpdate: false, + }, { // GuaranteedUpdate on non-existing key with ignoreNotFound=true + key: "/non-existing", + ignoreNotFound: true, + precondition: nil, + expectNotFoundErr: false, + expectInvalidObjErr: false, + expectNoUpdate: false, + }, { // GuaranteedUpdate on existing key + key: key, + ignoreNotFound: false, + precondition: nil, + expectNotFoundErr: false, + expectInvalidObjErr: false, + expectNoUpdate: false, + }, { // GuaranteedUpdate with same data + key: key, + ignoreNotFound: false, + precondition: nil, + expectNotFoundErr: false, + expectInvalidObjErr: false, + expectNoUpdate: true, + }, { // GuaranteedUpdate with same data but stale + key: key, + ignoreNotFound: false, + precondition: nil, + expectNotFoundErr: false, + expectInvalidObjErr: false, + expectNoUpdate: false, + transformStale: true, + }, { // GuaranteedUpdate with UID match + key: key, + ignoreNotFound: false, + precondition: storage.NewUIDPreconditions("A"), + expectNotFoundErr: false, + expectInvalidObjErr: false, + expectNoUpdate: true, + }, { // GuaranteedUpdate with UID mismatch + key: key, + ignoreNotFound: false, + precondition: storage.NewUIDPreconditions("B"), + expectNotFoundErr: false, + expectInvalidObjErr: true, + expectNoUpdate: true, + }} + + for i, tt := range tests { + out := &example.Pod{} + name := fmt.Sprintf("foo-%d", i) + if tt.expectNoUpdate { + name = storeObj.Name + } + originalTransformer := store.transformer.(prefixTransformer) + if tt.transformStale { + transformer := originalTransformer + transformer.stale = true + store.transformer = transformer + } + version := storeObj.ResourceVersion + err := store.GuaranteedUpdate(ctx, tt.key, out, tt.ignoreNotFound, tt.precondition, + storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) { + if tt.expectNotFoundErr && tt.ignoreNotFound { + if pod := obj.(*example.Pod); pod.Name != "" { + t.Errorf("#%d: expecting zero value, but get=%#v", i, pod) + } + } + pod := *storeObj + pod.Name = name + return &pod, nil + })) + store.transformer = originalTransformer + + if tt.expectNotFoundErr { + if err == nil || !storage.IsNotFound(err) { + t.Errorf("#%d: expecting not found error, but get: %v", i, err) + } + continue + } + if tt.expectInvalidObjErr { + if err == nil || !storage.IsInvalidObj(err) { + t.Errorf("#%d: expecting invalid UID error, but get: %s", i, err) + } + continue + } + if err != nil { + t.Fatalf("GuaranteedUpdate failed: %v", err) + } + if out.ObjectMeta.Name != name { + t.Errorf("#%d: pod name want=%s, get=%s", i, name, out.ObjectMeta.Name) + } + switch tt.expectNoUpdate { + case true: + if version != out.ResourceVersion { + t.Errorf("#%d: expect no version change, before=%s, after=%s", i, version, out.ResourceVersion) + } + case false: + if version == out.ResourceVersion { + t.Errorf("#%d: expect version change, but get the same version=%s", i, version) + } + } + storeObj = out + } +} + +func TestGuaranteedUpdateWithTTL(t *testing.T) { + ctx, store, cluster := testSetup(t) + defer cluster.Terminate(t) + + input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} + key := "/somekey" + + out := &example.Pod{} + err := store.GuaranteedUpdate(ctx, key, out, true, nil, + func(_ runtime.Object, _ storage.ResponseMeta) (runtime.Object, *uint64, error) { + ttl := uint64(1) + return input, &ttl, nil + }) + if err != nil { + t.Fatalf("Create failed: %v", err) + } + + w, err := store.Watch(ctx, key, out.ResourceVersion, storage.Everything) + if err != nil { + t.Fatalf("Watch failed: %v", err) + } + testCheckEventType(t, watch.Deleted, w) +} + +func TestGuaranteedUpdateChecksStoredData(t *testing.T) { + ctx, store, cluster := testSetup(t) + defer cluster.Terminate(t) + + input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} + key := "/somekey" + + // serialize input into etcd with data that would be normalized by a write - in this case, leading + // and trailing whitespace + codec := codecs.LegacyCodec(examplev1.SchemeGroupVersion) + data, err := runtime.Encode(codec, input) + if err != nil { + t.Fatal(err) + } + resp, err := store.client.Put(ctx, key, "test! "+string(data)+" ") + if err != nil { + t.Fatal(err) + } + + // this update should write the canonical value to etcd because the new serialization differs + // from the stored serialization + input.ResourceVersion = strconv.FormatInt(resp.Header.Revision, 10) + out := &example.Pod{} + err = store.GuaranteedUpdate(ctx, key, out, true, nil, + func(_ runtime.Object, _ storage.ResponseMeta) (runtime.Object, *uint64, error) { + return input, nil, nil + }, input) + if err != nil { + t.Fatalf("Update failed: %v", err) + } + if out.ResourceVersion == strconv.FormatInt(resp.Header.Revision, 10) { + t.Errorf("guaranteed update should have updated the serialized data, got %#v", out) + } +} + +func TestGuaranteedUpdateWithConflict(t *testing.T) { + ctx, store, cluster := testSetup(t) + defer cluster.Terminate(t) + key, _ := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) + + errChan := make(chan error, 1) + var firstToFinish sync.WaitGroup + var secondToEnter sync.WaitGroup + firstToFinish.Add(1) + secondToEnter.Add(1) + + go func() { + err := store.GuaranteedUpdate(ctx, key, &example.Pod{}, false, nil, + storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) { + pod := obj.(*example.Pod) + pod.Name = "foo-1" + secondToEnter.Wait() + return pod, nil + })) + firstToFinish.Done() + errChan <- err + }() + + updateCount := 0 + err := store.GuaranteedUpdate(ctx, key, &example.Pod{}, false, nil, + storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) { + if updateCount == 0 { + secondToEnter.Done() + firstToFinish.Wait() + } + updateCount++ + pod := obj.(*example.Pod) + pod.Name = "foo-2" + return pod, nil + })) + if err != nil { + t.Fatalf("Second GuaranteedUpdate error %#v", err) + } + if err := <-errChan; err != nil { + t.Fatalf("First GuaranteedUpdate error %#v", err) + } + + if updateCount != 2 { + t.Errorf("Should have conflict and called update func twice") + } +} + +func TestGuaranteedUpdateWithSuggestionAndConflict(t *testing.T) { + ctx, store, cluster := testSetup(t) + defer cluster.Terminate(t) + key, originalPod := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) + + // First, update without a suggestion so originalPod is outdated + updatedPod := &example.Pod{} + err := store.GuaranteedUpdate(ctx, key, updatedPod, false, nil, + storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) { + pod := obj.(*example.Pod) + pod.Name = "foo-2" + return pod, nil + }), + ) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Second, update using the outdated originalPod as the suggestion. Return a conflict error when + // passed originalPod, and make sure that SimpleUpdate is called a second time after a live lookup + // with the value of updatedPod. + sawConflict := false + updatedPod2 := &example.Pod{} + err = store.GuaranteedUpdate(ctx, key, updatedPod2, false, nil, + storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) { + pod := obj.(*example.Pod) + if pod.Name != "foo-2" { + if sawConflict { + t.Fatalf("unexpected second conflict") + } + sawConflict = true + // simulated stale object - return a conflict + return nil, apierrors.NewConflict(example.SchemeGroupVersion.WithResource("pods").GroupResource(), "name", errors.New("foo")) + } + pod.Name = "foo-3" + return pod, nil + }), + originalPod, + ) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if updatedPod2.Name != "foo-3" { + t.Errorf("unexpected pod name: %q", updatedPod2.Name) + } +} + +func TestTransformationFailure(t *testing.T) { + codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer cluster.Terminate(t) + store := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte("test!")}) + ctx := context.Background() + + preset := []struct { + key string + obj *example.Pod + storedObj *example.Pod + }{{ + key: "/one-level/test", + obj: &example.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "bar"}, + Spec: storagetests.DeepEqualSafePodSpec(), + }, + }, { + key: "/two-level/1/test", + obj: &example.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "baz"}, + Spec: storagetests.DeepEqualSafePodSpec(), + }, + }} + for i, ps := range preset[:1] { + preset[i].storedObj = &example.Pod{} + err := store.Create(ctx, ps.key, ps.obj, preset[:1][i].storedObj, 0) + if err != nil { + t.Fatalf("Set failed: %v", err) + } + } + + // create a second resource with an invalid prefix + oldTransformer := store.transformer + store.transformer = prefixTransformer{prefix: []byte("otherprefix!")} + for i, ps := range preset[1:] { + preset[1:][i].storedObj = &example.Pod{} + err := store.Create(ctx, ps.key, ps.obj, preset[1:][i].storedObj, 0) + if err != nil { + t.Fatalf("Set failed: %v", err) + } + } + store.transformer = oldTransformer + + // only the first item is returned, and no error + var got example.PodList + if err := store.List(ctx, "/", "", storage.Everything, &got); err != nil { + t.Errorf("Unexpected error %v", err) + } + if e, a := []example.Pod{*preset[0].storedObj}, got.Items; !reflect.DeepEqual(e, a) { + t.Errorf("Unexpected: %s", diff.ObjectReflectDiff(e, a)) + } + + // Get should fail + if err := store.Get(ctx, preset[1].key, "", &example.Pod{}, false); !storage.IsInternalError(err) { + t.Errorf("Unexpected error: %v", err) + } + // GuaranteedUpdate without suggestion should return an error + if err := store.GuaranteedUpdate(ctx, preset[1].key, &example.Pod{}, false, nil, func(input runtime.Object, res storage.ResponseMeta) (output runtime.Object, ttl *uint64, err error) { + return input, nil, nil + }); !storage.IsInternalError(err) { + t.Errorf("Unexpected error: %v", err) + } + // GuaranteedUpdate with suggestion should return an error if we don't change the object + if err := store.GuaranteedUpdate(ctx, preset[1].key, &example.Pod{}, false, nil, func(input runtime.Object, res storage.ResponseMeta) (output runtime.Object, ttl *uint64, err error) { + return input, nil, nil + }, preset[1].obj); err == nil { + t.Errorf("Unexpected error: %v", err) + } + + // Delete succeeds but reports an error because we cannot access the body + if err := store.Delete(ctx, preset[1].key, &example.Pod{}, nil); !storage.IsInternalError(err) { + t.Errorf("Unexpected error: %v", err) + } + + if err := store.Get(ctx, preset[1].key, "", &example.Pod{}, false); !storage.IsNotFound(err) { + t.Errorf("Unexpected error: %v", err) + } +} + +func TestList(t *testing.T) { + codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer cluster.Terminate(t) + store := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte("test!")}) + ctx := context.Background() + + // Setup storage with the following structure: + // / + // - one-level/ + // | - test + // | + // - two-level/ + // - 1/ + // | - test + // | + // - 2/ + // - test + preset := []struct { + key string + obj *example.Pod + storedObj *example.Pod + }{{ + key: "/one-level/test", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + }, { + key: "/two-level/1/test", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + }, { + key: "/two-level/2/test", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, + }} + + for i, ps := range preset { + preset[i].storedObj = &example.Pod{} + err := store.Create(ctx, ps.key, ps.obj, preset[i].storedObj, 0) + if err != nil { + t.Fatalf("Set failed: %v", err) + } + } + + tests := []struct { + prefix string + pred storage.SelectionPredicate + expectedOut []*example.Pod + }{{ // test List on existing key + prefix: "/one-level/", + pred: storage.Everything, + expectedOut: []*example.Pod{preset[0].storedObj}, + }, { // test List on non-existing key + prefix: "/non-existing/", + pred: storage.Everything, + expectedOut: nil, + }, { // test List with pod name matching + prefix: "/one-level/", + pred: storage.SelectionPredicate{ + Label: labels.Everything(), + Field: fields.ParseSelectorOrDie("metadata.name!=" + preset[0].storedObj.Name), + GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { + pod := obj.(*example.Pod) + return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil + }, + }, + expectedOut: nil, + }, { // test List with multiple levels of directories and expect flattened result + prefix: "/two-level/", + pred: storage.Everything, + expectedOut: []*example.Pod{preset[1].storedObj, preset[2].storedObj}, + }} + + for i, tt := range tests { + out := &example.PodList{} + err := store.List(ctx, tt.prefix, "0", tt.pred, out) + if err != nil { + t.Fatalf("List failed: %v", err) + } + if len(tt.expectedOut) != len(out.Items) { + t.Errorf("#%d: length of list want=%d, get=%d", i, len(tt.expectedOut), len(out.Items)) + continue + } + for j, wantPod := range tt.expectedOut { + getPod := &out.Items[j] + if !reflect.DeepEqual(wantPod, getPod) { + t.Errorf("#%d: pod want=%#v, get=%#v", i, wantPod, getPod) + } + } + } +} + +func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) { + codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + store := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte("test!")}) + ctx := context.Background() + return ctx, store, cluster +} + +// testPropogateStore helps propogates store with objects, automates key generation, and returns +// keys and stored objects. +func testPropogateStore(ctx context.Context, t *testing.T, store *store, obj *example.Pod) (string, *example.Pod) { + // Setup store with a key and grab the output for returning. + key := "/testkey" + setOutput := &example.Pod{} + err := store.Create(ctx, key, obj, setOutput, 0) + if err != nil { + t.Fatalf("Set failed: %v", err) + } + return key, setOutput +} + +func TestPrefix(t *testing.T) { + codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer cluster.Terminate(t) + transformer := prefixTransformer{prefix: []byte("test!")} + testcases := map[string]string{ + "custom/prefix": "/custom/prefix", + "/custom//prefix//": "/custom/prefix", + "/registry": "/registry", + } + for configuredPrefix, effectivePrefix := range testcases { + store := newStore(cluster.RandClient(), false, codec, configuredPrefix, transformer) + if store.pathPrefix != effectivePrefix { + t.Errorf("configured prefix of %s, expected effective prefix of %s, got %s", configuredPrefix, effectivePrefix, store.pathPrefix) + } + } +} diff --git a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go new file mode 100644 index 000000000000..052e4dc73a9a --- /dev/null +++ b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -0,0 +1,374 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcd3 + +import ( + "fmt" + "reflect" + "strconv" + "sync" + "testing" + "time" + + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/integration" + "golang.org/x/net/context" + + apitesting "k8s.io/apimachinery/pkg/api/testing" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/apis/example" + examplev1 "k8s.io/apiserver/pkg/apis/example/v1" + "k8s.io/apiserver/pkg/storage" +) + +func TestWatch(t *testing.T) { + testWatch(t, false) +} + +func TestWatchList(t *testing.T) { + testWatch(t, true) +} + +// It tests that +// - first occurrence of objects should notify Add event +// - update should trigger Modified event +// - update that gets filtered should trigger Deleted event +func testWatch(t *testing.T, recursive bool) { + ctx, store, cluster := testSetup(t) + defer cluster.Terminate(t) + podFoo := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} + podBar := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}} + + tests := []struct { + key string + pred storage.SelectionPredicate + watchTests []*testWatchStruct + }{{ // create a key + key: "/somekey-1", + watchTests: []*testWatchStruct{{podFoo, true, watch.Added}}, + pred: storage.Everything, + }, { // create a key but obj gets filtered. Then update it with unfiltered obj + key: "/somekey-3", + watchTests: []*testWatchStruct{{podFoo, false, ""}, {podBar, true, watch.Added}}, + pred: storage.SelectionPredicate{ + Label: labels.Everything(), + Field: fields.ParseSelectorOrDie("metadata.name=bar"), + GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { + pod := obj.(*example.Pod) + return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil + }, + }, + }, { // update + key: "/somekey-4", + watchTests: []*testWatchStruct{{podFoo, true, watch.Added}, {podBar, true, watch.Modified}}, + pred: storage.Everything, + }, { // delete because of being filtered + key: "/somekey-5", + watchTests: []*testWatchStruct{{podFoo, true, watch.Added}, {podBar, true, watch.Deleted}}, + pred: storage.SelectionPredicate{ + Label: labels.Everything(), + Field: fields.ParseSelectorOrDie("metadata.name!=bar"), + GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { + pod := obj.(*example.Pod) + return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil + }, + }, + }} + for i, tt := range tests { + w, err := store.watch(ctx, tt.key, "0", tt.pred, recursive) + if err != nil { + t.Fatalf("Watch failed: %v", err) + } + var prevObj *example.Pod + for _, watchTest := range tt.watchTests { + out := &example.Pod{} + key := tt.key + if recursive { + key = key + "/item" + } + err := store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate( + func(runtime.Object) (runtime.Object, error) { + return watchTest.obj, nil + })) + if err != nil { + t.Fatalf("GuaranteedUpdate failed: %v", err) + } + if watchTest.expectEvent { + expectObj := out + if watchTest.watchType == watch.Deleted { + expectObj = prevObj + expectObj.ResourceVersion = out.ResourceVersion + } + testCheckResult(t, i, watchTest.watchType, w, expectObj) + } + prevObj = out + } + w.Stop() + testCheckStop(t, i, w) + } +} + +func TestDeleteTriggerWatch(t *testing.T) { + ctx, store, cluster := testSetup(t) + defer cluster.Terminate(t) + key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) + w, err := store.Watch(ctx, key, storedObj.ResourceVersion, storage.Everything) + if err != nil { + t.Fatalf("Watch failed: %v", err) + } + if err := store.Delete(ctx, key, &example.Pod{}, nil); err != nil { + t.Fatalf("Delete failed: %v", err) + } + testCheckEventType(t, watch.Deleted, w) +} + +// TestWatchFromZero tests that +// - watch from 0 should sync up and grab the object added before +// - watch from 0 is able to return events for objects whose previous version has been compacted +func TestWatchFromZero(t *testing.T) { + ctx, store, cluster := testSetup(t) + defer cluster.Terminate(t) + key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}}) + + w, err := store.Watch(ctx, key, "0", storage.Everything) + if err != nil { + t.Fatalf("Watch failed: %v", err) + } + testCheckResult(t, 0, watch.Added, w, storedObj) + w.Stop() + + // Update + out := &example.Pod{} + err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate( + func(runtime.Object) (runtime.Object, error) { + return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns", Annotations: map[string]string{"a": "1"}}}, nil + })) + if err != nil { + t.Fatalf("GuaranteedUpdate failed: %v", err) + } + + // Make sure when we watch from 0 we receive an ADDED event + w, err = store.Watch(ctx, key, "0", storage.Everything) + if err != nil { + t.Fatalf("Watch failed: %v", err) + } + testCheckResult(t, 1, watch.Added, w, out) + w.Stop() + + // Update again + out = &example.Pod{} + err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate( + func(runtime.Object) (runtime.Object, error) { + return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}}, nil + })) + if err != nil { + t.Fatalf("GuaranteedUpdate failed: %v", err) + } + + // Compact previous versions + revToCompact, err := strconv.Atoi(out.ResourceVersion) + if err != nil { + t.Fatalf("Error converting %q to an int: %v", storedObj.ResourceVersion, err) + } + _, err = cluster.RandClient().Compact(ctx, int64(revToCompact), clientv3.WithCompactPhysical()) + if err != nil { + t.Fatalf("Error compacting: %v", err) + } + + // Make sure we can still watch from 0 and receive an ADDED event + w, err = store.Watch(ctx, key, "0", storage.Everything) + if err != nil { + t.Fatalf("Watch failed: %v", err) + } + testCheckResult(t, 2, watch.Added, w, out) +} + +// TestWatchFromNoneZero tests that +// - watch from non-0 should just watch changes after given version +func TestWatchFromNoneZero(t *testing.T) { + ctx, store, cluster := testSetup(t) + defer cluster.Terminate(t) + key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) + + w, err := store.Watch(ctx, key, storedObj.ResourceVersion, storage.Everything) + if err != nil { + t.Fatalf("Watch failed: %v", err) + } + out := &example.Pod{} + store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate( + func(runtime.Object) (runtime.Object, error) { + return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, err + })) + testCheckResult(t, 0, watch.Modified, w, out) +} + +func TestWatchError(t *testing.T) { + codec := &testCodec{apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)} + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer cluster.Terminate(t) + invalidStore := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte("test!")}) + ctx := context.Background() + w, err := invalidStore.Watch(ctx, "/abc", "0", storage.Everything) + if err != nil { + t.Fatalf("Watch failed: %v", err) + } + validStore := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte("test!")}) + validStore.GuaranteedUpdate(ctx, "/abc", &example.Pod{}, true, nil, storage.SimpleUpdate( + func(runtime.Object) (runtime.Object, error) { + return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, nil + })) + testCheckEventType(t, watch.Error, w) +} + +func TestWatchContextCancel(t *testing.T) { + ctx, store, cluster := testSetup(t) + defer cluster.Terminate(t) + canceledCtx, cancel := context.WithCancel(ctx) + cancel() + // When we watch with a canceled context, we should detect that it's context canceled. + // We won't take it as error and also close the watcher. + w, err := store.watcher.Watch(canceledCtx, "/abc", 0, false, storage.Everything) + if err != nil { + t.Fatal(err) + } + + select { + case _, ok := <-w.ResultChan(): + if ok { + t.Error("ResultChan() should be closed") + } + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("timeout after %v", wait.ForeverTestTimeout) + } +} + +func TestWatchErrResultNotBlockAfterCancel(t *testing.T) { + origCtx, store, cluster := testSetup(t) + defer cluster.Terminate(t) + ctx, cancel := context.WithCancel(origCtx) + w := store.watcher.createWatchChan(ctx, "/abc", 0, false, storage.Everything) + // make resutlChan and errChan blocking to ensure ordering. + w.resultChan = make(chan watch.Event) + w.errChan = make(chan error) + // The event flow goes like: + // - first we send an error, it should block on resultChan. + // - Then we cancel ctx. The blocking on resultChan should be freed up + // and run() goroutine should return. + var wg sync.WaitGroup + wg.Add(1) + go func() { + w.run() + wg.Done() + }() + w.errChan <- fmt.Errorf("some error") + cancel() + wg.Wait() +} + +func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) { + ctx, store, cluster := testSetup(t) + defer cluster.Terminate(t) + key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) + + w, err := store.Watch(ctx, key, storedObj.ResourceVersion, storage.Everything) + if err != nil { + t.Fatalf("Watch failed: %v", err) + } + etcdW := cluster.RandClient().Watch(ctx, "/", clientv3.WithPrefix()) + + if err := store.Delete(ctx, key, &example.Pod{}, &storage.Preconditions{}); err != nil { + t.Fatalf("Delete failed: %v", err) + } + + e := <-w.ResultChan() + watchedDeleteObj := e.Object.(*example.Pod) + var wres clientv3.WatchResponse + wres = <-etcdW + + watchedDeleteRev, err := storage.ParseWatchResourceVersion(watchedDeleteObj.ResourceVersion) + if err != nil { + t.Fatalf("ParseWatchResourceVersion failed: %v", err) + } + if int64(watchedDeleteRev) != wres.Events[0].Kv.ModRevision { + t.Errorf("Object from delete event have version: %v, should be the same as etcd delete's mod rev: %d", + watchedDeleteRev, wres.Events[0].Kv.ModRevision) + } +} + +type testWatchStruct struct { + obj *example.Pod + expectEvent bool + watchType watch.EventType +} + +type testCodec struct { + runtime.Codec +} + +func (c *testCodec) Decode(data []byte, defaults *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) { + return nil, nil, errTestingDecode +} + +func testCheckEventType(t *testing.T, expectEventType watch.EventType, w watch.Interface) { + select { + case res := <-w.ResultChan(): + if res.Type != expectEventType { + t.Errorf("event type want=%v, get=%v", expectEventType, res.Type) + } + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("time out after waiting %v on ResultChan", wait.ForeverTestTimeout) + } +} + +func testCheckResult(t *testing.T, i int, expectEventType watch.EventType, w watch.Interface, expectObj *example.Pod) { + select { + case res := <-w.ResultChan(): + if res.Type != expectEventType { + t.Errorf("#%d: event type want=%v, get=%v", i, expectEventType, res.Type) + return + } + if !reflect.DeepEqual(expectObj, res.Object) { + t.Errorf("#%d: obj want=\n%#v\nget=\n%#v", i, expectObj, res.Object) + } + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("#%d: time out after waiting %v on ResultChan", i, wait.ForeverTestTimeout) + } +} + +func testCheckStop(t *testing.T, i int, w watch.Interface) { + select { + case e, ok := <-w.ResultChan(): + if ok { + var obj string + switch e.Object.(type) { + case *example.Pod: + obj = e.Object.(*example.Pod).Name + case *metav1.Status: + obj = e.Object.(*metav1.Status).Message + } + t.Errorf("#%d: ResultChan should have been closed. Event: %s. Object: %s", i, e.Type, obj) + } + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("#%d: time out after waiting 1s on ResultChan", i) + } +} diff --git a/vendor/k8s.io/kubernetes/staging/src/k8s.io/client-go/discovery/restmapper.go b/vendor/k8s.io/kubernetes/staging/src/k8s.io/client-go/discovery/restmapper.go index 9651716bd130..6d1de8c1b19b 100644 --- a/vendor/k8s.io/kubernetes/staging/src/k8s.io/client-go/discovery/restmapper.go +++ b/vendor/k8s.io/kubernetes/staging/src/k8s.io/client-go/discovery/restmapper.go @@ -96,8 +96,19 @@ func NewRESTMapper(groupResources []*APIGroupResources, versionInterfaces meta.V if !resource.Namespaced { scope = meta.RESTScopeRoot } - versionMapper.Add(gv.WithKind(resource.Kind), scope) - // TODO only do this if it supports listing + + // this is for legacy resources and servers which don't list singular forms. For those we must still guess. + if len(resource.SingularName) == 0 { + versionMapper.Add(gv.WithKind(resource.Kind), scope) + // TODO this is producing unsafe guesses that don't actually work, but it matches previous behavior + versionMapper.Add(gv.WithKind(resource.Kind+"List"), scope) + continue + } + + plural := gv.WithResource(resource.Name) + singular := gv.WithResource(resource.SingularName) + versionMapper.AddSpecific(gv.WithKind(resource.Kind), plural, singular, scope) + // TODO this is producing unsafe guesses that don't actually work, but it matches previous behavior versionMapper.Add(gv.WithKind(resource.Kind+"List"), scope) } // TODO why is this type not in discovery (at least for "v1") diff --git a/vendor/k8s.io/kubernetes/staging/src/k8s.io/client-go/pkg/apis/rbac/helpers.go b/vendor/k8s.io/kubernetes/staging/src/k8s.io/client-go/pkg/apis/rbac/helpers.go index 9895b484b03d..da8f1d9a158e 100644 --- a/vendor/k8s.io/kubernetes/staging/src/k8s.io/client-go/pkg/apis/rbac/helpers.go +++ b/vendor/k8s.io/kubernetes/staging/src/k8s.io/client-go/pkg/apis/rbac/helpers.go @@ -346,7 +346,7 @@ func NewRoleBindingForClusterRole(roleName, namespace string) *RoleBindingBuilde // Groups adds the specified groups as the subjects of the RoleBinding. func (r *RoleBindingBuilder) Groups(groups ...string) *RoleBindingBuilder { for _, group := range groups { - r.RoleBinding.Subjects = append(r.RoleBinding.Subjects, Subject{Kind: GroupKind, Name: group}) + r.RoleBinding.Subjects = append(r.RoleBinding.Subjects, Subject{Kind: GroupKind, APIGroup: GroupName, Name: group}) } return r } @@ -354,7 +354,7 @@ func (r *RoleBindingBuilder) Groups(groups ...string) *RoleBindingBuilder { // Users adds the specified users as the subjects of the RoleBinding. func (r *RoleBindingBuilder) Users(users ...string) *RoleBindingBuilder { for _, user := range users { - r.RoleBinding.Subjects = append(r.RoleBinding.Subjects, Subject{Kind: UserKind, Name: user}) + r.RoleBinding.Subjects = append(r.RoleBinding.Subjects, Subject{Kind: UserKind, APIGroup: GroupName, Name: user}) } return r } diff --git a/vendor/k8s.io/kubernetes/test/e2e/third-party.go b/vendor/k8s.io/kubernetes/test/e2e/third-party.go index 9f2e6da01af1..5eccbf5f73e0 100644 --- a/vendor/k8s.io/kubernetes/test/e2e/third-party.go +++ b/vendor/k8s.io/kubernetes/test/e2e/third-party.go @@ -72,7 +72,7 @@ var _ = Describe("ThirdParty resources [Flaky] [Disruptive]", func() { } Context("Simple Third Party", func() { - It("creating/deleting thirdparty objects works [Conformance]", func() { + It("creating/deleting thirdparty objects works", func() { defer func() { if err := f.ClientSet.Extensions().ThirdPartyResources().Delete(rsrc.Name, nil); err != nil { framework.Failf("failed to delete third party resource: %v", err)