Skip to content

Commit

Permalink
add versionsApi
Browse files Browse the repository at this point in the history
  • Loading branch information
wyyolo committed Mar 24, 2024
1 parent 7d3f316 commit 5b25436
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 5 deletions.
1 change: 0 additions & 1 deletion streampipes-client-go/examples/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ func main() {
log.Fatal(err)
}
dataSeries.Print()

/*
output format:
Expand Down
46 changes: 45 additions & 1 deletion streampipes-client-go/streampipes/data_lake_measure_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func NewDataLakeMeasures(clientConfig config.StreamPipesClientConfig) *DataLakeM
}
}

// AllDataLakeMeasure retrieves all the measures from the Data Lake.
// AllDataLakeMeasure retrieves a list of all measurements series from the Data Lake.
func (d *DataLakeMeasure) AllDataLakeMeasure() ([]data_lake.DataLakeMeasure, error) {

endPointUrl := util.NewStreamPipesApiPath(d.config.Url, "streampipes-backend/api/v4/datalake/measurements", nil)
Expand Down Expand Up @@ -74,6 +74,27 @@ func (d *DataLakeMeasure) AllDataLakeMeasure() ([]data_lake.DataLakeMeasure, err
return dataLakeMeasures, nil
}

// DeleteDataLakeMeasurements removes all stored measurement series form Data Lake.
func (d *DataLakeMeasure) DeleteDataLakeMeasurements() error {

endPointUrl := util.NewStreamPipesApiPath(d.config.Url, "streampipes-backend/api/v4/datalake/measurements", nil)
log.Printf("Delete data from: %s", endPointUrl)

response, err := d.executeRequest("DELETE", endPointUrl)
if err != nil {
return err
}

if response.StatusCode != http.StatusOK {
err = d.handleStatusCode(response)
if err != nil {
return err
}
}

return nil
}

// GetSingleDataLakeMeasure retrieves a specific measure from the Data Lake.
func (d *DataLakeMeasure) GetSingleDataLakeMeasure(elementId string) (data_lake.DataLakeMeasure, error) {

Expand Down Expand Up @@ -106,6 +127,27 @@ func (d *DataLakeMeasure) GetSingleDataLakeMeasure(elementId string) (data_lake.
return dataLakeMeasure, nil
}

// DeleteSingleDataLakeMeasure deletes a specific measure from the Data Lake.
func (d *DataLakeMeasure) DeleteSingleDataLakeMeasure(elementId string) error {

endPointUrl := util.NewStreamPipesApiPath(d.config.Url, "streampipes-backend/api/v4/datalake/measure", []string{elementId})
log.Printf("Delete data from: %s", endPointUrl)

response, err := d.executeRequest("DELETE", endPointUrl)
if err != nil {
return err
}

if response.StatusCode != http.StatusOK {
err = d.handleStatusCode(response)
if err != nil {
return err
}
}

return nil
}

// GetSingleDataSeries retrieves the measurement series for the specified measureId from the Data Lake.
// Currently not supporting parameter queries.
// The measureId can also be considered measureName.
Expand Down Expand Up @@ -141,6 +183,7 @@ func (d *DataLakeMeasure) GetSingleDataSeries(measureId string) (*data_lake.Data
}

// ClearDataLakeMeasureData removes data from a single measurement series with given id.
// The measureId can also be considered measureName.
func (d *DataLakeMeasure) ClearDataLakeMeasureData(measureId string) error {

endPointUrl := util.NewStreamPipesApiPath(d.config.Url, "streampipes-backend/api/v4/datalake/measurements", []string{measureId})
Expand All @@ -163,6 +206,7 @@ func (d *DataLakeMeasure) ClearDataLakeMeasureData(measureId string) error {
}

// DeleteDataLakeMeasure drops a single measurement series with given id from Data Lake and remove related event property.
// The measureId can also be considered measureName.
func (d *DataLakeMeasure) DeleteDataLakeMeasure(measureId string) error {

endPointUrl := util.NewStreamPipesApiPath(d.config.Url, "streampipes-backend/api/v4/datalake/measurements", []string{measureId, "drop"})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package serializer
import (
"encoding/json"
"github.com/apache/streampipes/streampipes-client-go/streampipes/model/data_lake"
"github.com/apache/streampipes/streampipes-client-go/streampipes/model/streampipes_version"
)

type Deserializer interface {
Expand All @@ -36,7 +37,7 @@ func NewDataLakeMeasuresDeserializer() *DataLakeMeasuresDeserializer {
return &DataLakeMeasuresDeserializer{}
}

func (u *DataLakeMeasuresDeserializer) Unmarshal(data []byte) (interface{}, error) {
func (d *DataLakeMeasuresDeserializer) Unmarshal(data []byte) (interface{}, error) {
var dataLakeMeasures []data_lake.DataLakeMeasure
err := json.Unmarshal(data, &dataLakeMeasures)
if err != nil {
Expand All @@ -51,7 +52,7 @@ func NewDataLakeMeasureDeserializer() *DataLakeMeasureDeserializer {
return &DataLakeMeasureDeserializer{}
}

func (u *DataLakeMeasureDeserializer) Unmarshal(data []byte) (interface{}, error) {
func (d *DataLakeMeasureDeserializer) Unmarshal(data []byte) (interface{}, error) {
var dataLakeMeasure data_lake.DataLakeMeasure
err := json.Unmarshal(data, &dataLakeMeasure)
if err != nil {
Expand All @@ -66,11 +67,26 @@ func NewDataSeriesDeserializer() *DataSeriesDeserializer {
return &DataSeriesDeserializer{}
}

func (u *DataSeriesDeserializer) Unmarshal(data []byte) (interface{}, error) {
func (d *DataSeriesDeserializer) Unmarshal(data []byte) (interface{}, error) {
var dataSeries data_lake.DataSeries
err := json.Unmarshal(data, &dataSeries)
if err != nil {
return nil, err
}
return dataSeries, nil
}

type StreamPipesVersionDeserializer struct{}

func NewStreamPipesVersionDeserializer() *StreamPipesVersionDeserializer {
return &StreamPipesVersionDeserializer{}
}

func (d *StreamPipesVersionDeserializer) Unmarshal(data []byte) (interface{}, error) {
var dataSeries streampipes_version.Versions
err := json.Unmarshal(data, &dataSeries)
if err != nil {
return nil, err
}
return dataSeries, nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
//
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package streampipes_version

type Versions struct {
BackendVersion string `json:"backendVersion"`
ItemVersions []string `json:"itemVersions"`
}
5 changes: 5 additions & 0 deletions streampipes-client-go/streampipes/streampipes_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,8 @@ func (s *StreamPipesClient) DataLakeMeasures() *DataLakeMeasure {

return NewDataLakeMeasures(s.config)
}

func (s *StreamPipesClient) StreamPipesVersion() *Versions {

return NewVersion(s.config)
}
70 changes: 70 additions & 0 deletions streampipes-client-go/streampipes/streampipes_version_api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
//
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package streampipes

import (
"github.com/apache/streampipes/streampipes-client-go/streampipes/config"
"github.com/apache/streampipes/streampipes-client-go/streampipes/internal/serializer"
"github.com/apache/streampipes/streampipes-client-go/streampipes/internal/util"
"github.com/apache/streampipes/streampipes-client-go/streampipes/model/streampipes_version"
"io"
"log"
"net/http"
)

type Versions struct {
endpoint
}

func NewVersion(clientConfig config.StreamPipesClientConfig) *Versions {

return &Versions{
endpoint{config: clientConfig},
}
}

// GetStreamPipesVersion provides health-check and information about current backend version.
func (d *Versions) GetStreamPipesVersion() (streampipes_version.Versions, error) {

endPointUrl := util.NewStreamPipesApiPath(d.config.Url, "streampipes-backend/api/v2/info/versions", nil)
log.Printf("Get data from: %s", endPointUrl)

response, err := d.executeRequest("GET", endPointUrl)
if err != nil {
return streampipes_version.Versions{}, err
}

if response.StatusCode != http.StatusOK {
err = d.handleStatusCode(response)
if err != nil {
return streampipes_version.Versions{}, err
}
}

body, err := io.ReadAll(response.Body)
if err != nil {
return streampipes_version.Versions{}, err
}

unmarshalData, err := serializer.NewStreamPipesVersionDeserializer().Unmarshal(body)
if err != nil {
return streampipes_version.Versions{}, err
}
version := unmarshalData.(streampipes_version.Versions)
return version, nil
}

0 comments on commit 5b25436

Please sign in to comment.