Skip to content

Commit

Permalink
feat(job): remove useless code (#943)
Browse files Browse the repository at this point in the history
* feat(job): remove useless code

* fix ut

* adjust code

* add more log

* fmt package
  • Loading branch information
D0m021ng authored Nov 24, 2022
1 parent c16420b commit 7fb5097
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 258 deletions.
57 changes: 9 additions & 48 deletions pkg/apiserver/controller/log/cluster_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,15 @@ import (
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic/dynamicinformer"
fakedynamicclient "k8s.io/client-go/dynamic/fake"
"k8s.io/client-go/kubernetes"
fakedclient "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"

"github.com/PaddlePaddle/PaddleFlow/pkg/common/config"
"github.com/PaddlePaddle/PaddleFlow/pkg/common/k8s"
"github.com/PaddlePaddle/PaddleFlow/pkg/common/logger"
pfschema "github.com/PaddlePaddle/PaddleFlow/pkg/common/schema"
kuberuntime "github.com/PaddlePaddle/PaddleFlow/pkg/job/runtime_v2"
runtime "github.com/PaddlePaddle/PaddleFlow/pkg/job/runtime_v2"
"github.com/PaddlePaddle/PaddleFlow/pkg/job/runtime_v2/client"
"github.com/PaddlePaddle/PaddleFlow/pkg/job/runtime_v2/framework"
"github.com/PaddlePaddle/PaddleFlow/pkg/model"
Expand Down Expand Up @@ -50,28 +44,16 @@ var clusterInfo = model.ClusterInfo{
NamespaceList: []string{"default", "n2", MockNamespace},
}

func newFakeDynamicClient(server *httptest.Server) *k8s.DynamicClientOption {
scheme := runtime.NewScheme()
dynamicClient := fakedynamicclient.NewSimpleDynamicClient(scheme)
fakeDiscovery := discovery.NewDiscoveryClientForConfigOrDie(&rest.Config{Host: server.URL})
return &k8s.DynamicClientOption{
DynamicClient: dynamicClient,
DynamicFactory: dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, 0),
DiscoveryClient: fakeDiscovery,
ClusterInfo: &pfschema.Cluster{Name: "test-cluster"},
}
}

func TestGetPFJobLogs(t *testing.T) {
var server = httptest.NewServer(k8s.DiscoveryHandlerFunc)
defer server.Close()
dynamicClient := newFakeDynamicClient(server)
krc := client.NewFakeKubeRuntimeClient(server)
clientset := fakedclient.NewSimpleClientset()

//CreateRuntime
e1 := &kuberuntime.KubeRuntime{}
e1 := &runtime.KubeRuntime{}
patch4 := gomonkey.ApplyPrivateMethod(e1, "BuildConfig", func() (*rest.Config, error) {
return dynamicClient.Config, nil
return krc.Config, nil
})
defer patch4.Reset()

Expand All @@ -80,18 +62,8 @@ func TestGetPFJobLogs(t *testing.T) {
})
defer patch2.Reset()

krc := client.KubeRuntimeClient{
Client: clientset,
DynamicClient: dynamicClient.DynamicClient,
DynamicFactory: dynamicClient.DynamicFactory,
DiscoveryClient: dynamicClient.DiscoveryClient,
Config: dynamicClient.Config,
ClusterInfo: dynamicClient.ClusterInfo,
JobInformerMap: make(map[schema.GroupVersionKind]cache.SharedIndexInformer),
QueueInformerMap: make(map[schema.GroupVersionKind]cache.SharedIndexInformer),
}
patch3 := gomonkey.ApplyFunc(client.CreateKubeRuntimeClient, func(_ *rest.Config, _ *pfschema.Cluster) (framework.RuntimeClientInterface, error) {
return &krc, nil
return krc, nil
})
defer patch3.Reset()

Expand Down Expand Up @@ -189,14 +161,14 @@ func TestGetPFJobLogs(t *testing.T) {
func TestGetKubernetesResourceLogs(t *testing.T) {
var server = httptest.NewServer(k8s.DiscoveryHandlerFunc)
defer server.Close()
dynamicClient := newFakeDynamicClient(server)
krc := client.NewFakeKubeRuntimeClient(server)
clientset := fakedclient.NewSimpleClientset()

//CreateRuntime
e1 := &kuberuntime.KubeRuntime{}
e1 := &runtime.KubeRuntime{}

patch4 := gomonkey.ApplyPrivateMethod(e1, "BuildConfig", func() (*rest.Config, error) {
return dynamicClient.Config, nil
return krc.Config, nil
})
defer patch4.Reset()

Expand All @@ -210,17 +182,6 @@ func TestGetKubernetesResourceLogs(t *testing.T) {
})
defer patch3.Reset()

krc := client.KubeRuntimeClient{
Client: clientset,
DynamicClient: dynamicClient.DynamicClient,
DynamicFactory: dynamicClient.DynamicFactory,
DiscoveryClient: dynamicClient.DiscoveryClient,
Config: dynamicClient.Config,
ClusterInfo: dynamicClient.ClusterInfo,
JobInformerMap: make(map[schema.GroupVersionKind]cache.SharedIndexInformer),
QueueInformerMap: make(map[schema.GroupVersionKind]cache.SharedIndexInformer),
}

driver.InitMockDB()
config.GlobalServerConfig = &config.ServerConfig{}
config.GlobalServerConfig.Job.IsSingleCluster = true
Expand Down Expand Up @@ -374,7 +335,7 @@ func TestGetKubernetesResourceLogs(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
if tt.args.clientEnable {
patch3 := gomonkey.ApplyFunc(client.CreateKubeRuntimeClient, func(_ *rest.Config, _ *pfschema.Cluster) (framework.RuntimeClientInterface, error) {
return &krc, nil
return krc, nil
})
defer patch3.Reset()
}
Expand Down
135 changes: 10 additions & 125 deletions pkg/common/k8s/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,10 @@ package k8s

import (
"fmt"
"sync"

log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
"k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"

commomschema "github.com/PaddlePaddle/PaddleFlow/pkg/common/schema"
)
Expand All @@ -54,22 +46,16 @@ var (
ArgoWorkflowGVK = schema.GroupVersionKind{Group: "argoproj.io", Version: "v1alpha1", Kind: "Workflow"}

// GVKJobStatusMap contains GroupVersionKind and convertStatus function to sync job status
GVKJobStatusMap = map[schema.GroupVersionKind]GetStatusFunc{
VCJobGVK: VCJobStatus,
SparkAppGVK: SparkAppStatus,
PaddleJobGVK: PaddleJobStatus,
PodGVK: SingleJobStatus,
ArgoWorkflowGVK: ArgoWorkflowStatus,
PyTorchJobGVK: PytorchJobStatus,
TFJobGVK: TFJobStatus,
MXNetJobGVK: MXNetJobStatus,
MPIJobGVK: MPIJobStatus,
RayJobGVK: RayJobStatus,
}
// GVKToQuotaType GroupVersionKind lists for PaddleFlow QuotaType
GVKToQuotaType = []schema.GroupVersionKind{
VCQueueGVK,
EQuotaGVK,
GVKJobStatusMap = map[schema.GroupVersionKind]bool{
SparkAppGVK: true,
PaddleJobGVK: true,
PodGVK: true,
ArgoWorkflowGVK: true,
PyTorchJobGVK: true,
TFJobGVK: true,
MXNetJobGVK: true,
MPIJobGVK: true,
RayJobGVK: true,
}
)

Expand Down Expand Up @@ -129,67 +115,6 @@ type StatusInfo struct {
Message string
}

type GetStatusFunc func(interface{}) (StatusInfo, error)

// DynamicClientOption for kubernetes dynamic client
type DynamicClientOption struct {
DynamicClient dynamic.Interface
DynamicFactory dynamicinformer.DynamicSharedInformerFactory
DiscoveryClient discovery.DiscoveryInterface
Config *rest.Config
ClusterInfo *commomschema.Cluster
// GVKToGVR contains GroupVersionKind map to GroupVersionResource
GVKToGVR sync.Map
}

func CreateDynamicClientOpt(config *rest.Config, cluster *commomschema.Cluster) (*DynamicClientOption, error) {
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
log.Errorf("init dynamic client failed. error:%s", err)
return nil, err
}
factory := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, 0)
discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
if err != nil {
log.Errorf("create discovery client failed: %v", err)
return nil, err
}
if cluster == nil {
log.Errorf("cluster info is nil")
return nil, fmt.Errorf("cluster info is nil")
}
return &DynamicClientOption{
DynamicClient: dynamicClient,
DynamicFactory: factory,
DiscoveryClient: discoveryClient,
Config: config,
ClusterInfo: cluster,
}, nil
}

func (dc *DynamicClientOption) GetGVR(gvk schema.GroupVersionKind) (meta.RESTMapping, error) {
gvr, ok := dc.GVKToGVR.Load(gvk.String())
if ok {
return gvr.(meta.RESTMapping), nil
}
return dc.findGVR(&gvk)
}

func (dc *DynamicClientOption) findGVR(gvk *schema.GroupVersionKind) (meta.RESTMapping, error) {
// DiscoveryClient queries API server about the resources
mapper := restmapper.NewDeferredDiscoveryRESTMapper(memory.NewMemCacheClient(dc.DiscoveryClient))
// Find GVR
mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
log.Warningf("find GVR with restMapping failed: %v", err)
return meta.RESTMapping{}, err
}
// Store GVR
log.Debugf("The GVR of GVK[%s] is [%s]", gvk.String(), mapping.Resource.String())
dc.GVKToGVR.Store(gvk.String(), *mapping)
return *mapping, nil
}

type PodStatusMessage struct {
Phase v1.PodPhase `json:"phase,omitempty"`
Message string `json:"message,omitempty"`
Expand Down Expand Up @@ -274,43 +199,3 @@ func GetTaskMessage(podStatus *v1.PodStatus) string {
}
return statusMessage.String()
}

func GetJobGVK(jobType commomschema.JobType, framework commomschema.Framework) (schema.GroupVersionKind, error) {
var gvk schema.GroupVersionKind
var err error
switch jobType {
case commomschema.TypeSingle:
gvk = PodGVK
case commomschema.TypeDistributed:
gvk, err = getDistributedJobGVK(framework)
case commomschema.TypeWorkflow:
gvk = ArgoWorkflowGVK
default:
err = fmt.Errorf("job type %s is not supported", jobType)
}
return gvk, err
}

func getDistributedJobGVK(framework commomschema.Framework) (schema.GroupVersionKind, error) {
var gvk schema.GroupVersionKind
var err error
switch framework {
case commomschema.FrameworkPaddle:
gvk = PaddleJobGVK
case commomschema.FrameworkSpark:
gvk = SparkAppGVK
case commomschema.FrameworkPytorch:
gvk = PyTorchJobGVK
case commomschema.FrameworkTF:
gvk = TFJobGVK
case commomschema.FrameworkMXNet:
gvk = MXNetJobGVK
case commomschema.FrameworkRay:
gvk = RayJobGVK
case commomschema.FrameworkMPI:
err = fmt.Errorf("framework %s is not implemented", framework)
default:
err = fmt.Errorf("framework %s is not supported", framework)
}
return gvk, err
}
10 changes: 8 additions & 2 deletions pkg/common/k8s/quota.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,11 @@ func init() {
if gpuNamePrefix != "" {
GPURNamePrefix = gpuNamePrefix
}
fmt.Printf("CORE_POD key: %s, IDX key: %s, GPU Name prefix: %s", GPUCorePodKey, GPUIdxKey, GPURNamePrefix)
fmt.Printf("CORE_POD key: %s, IDX key: %s, GPU Name prefix: %s\n", GPUCorePodKey, GPUIdxKey, GPURNamePrefix)
}

func SharedGPUIDX(pod *v1.Pod) int64 {
log.Debugf("pod %s/%s shared gpu idx: %v", pod.Namespace, pod.Name, pod.Annotations)
var deviceIDX int64 = -1
annotations := pod.Annotations
if annotations == nil {
Expand All @@ -80,13 +81,15 @@ func SharedGPUIDX(pod *v1.Pod) int64 {
}

idxs := strings.Split(idxStr, ",")
log.Debugf("split idxs: %v", idxs)
if corePodNum/50 == len(idxs) {
// is shared gpu
deviceIDX = 0
var deviceID int
for _, idx := range idxs {
deviceID, err = strconv.Atoi(idx)
if err != nil {
log.Warnf("convert str[%s] to int failed, err: %v", idx, err)
return -1
}
deviceIDX += GPUDeviceIDX(deviceID)
Expand All @@ -113,13 +116,15 @@ func SubWithGPUX(total *pfResources.Resource, rr map[string]int64) map[string]in
if total == nil || rr == nil {
return make(map[string]interface{})
}
log.Debugf("total: %v, used: %v", total, rr)
var isShared = false
var sharedDevices []int
gpuIDX, find := rr[GPUIndexResources]
if find {
isShared = true
sharedDevices = GPUSharedDevices(gpuIDX)
}
log.Debugf("gpuIDX: %v, shared devices: %v", gpuIDX, sharedDevices)
// get used gpu and gpu core
var gpux, gpuxCore int64
for rName, rValue := range rr {
Expand All @@ -130,6 +135,7 @@ func SubWithGPUX(total *pfResources.Resource, rr map[string]int64) map[string]in
gpuxCore = rValue
}
}
log.Debugf("gpuIDX: %v, used gpu %d, and used gpu core %d", gpuIDX, gpux, gpuxCore)
// get gpu count and name
var gpuTotalCount int64
var gpuxName string
Expand Down Expand Up @@ -157,7 +163,7 @@ func SubWithGPUX(total *pfResources.Resource, rr map[string]int64) map[string]in
"100": idlegpux,
"50": idleSharedGPUX,
}
log.Debugf("%s gpu %d, shared gpu %d", gpuxName, idlegpux, idleSharedGPUX)
log.Debugf("%s, idle gpu %d, idle shared gpu %d", gpuxName, idlegpux, idleSharedGPUX)
} else {
idlegpux = gpuTotalCount - gpux
gpuxResource["100"] = idlegpux
Expand Down
Loading

0 comments on commit 7fb5097

Please sign in to comment.