Skip to content

Commit

Permalink
Add DateTimeTransformation to convert timestamp fields in records
Browse files Browse the repository at this point in the history
  • Loading branch information
HumbleBeck committed Jan 20, 2025
1 parent f69c6b4 commit 590be24
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 2 deletions.
29 changes: 27 additions & 2 deletions connectors/aws-amplify-source/source_aws_amplify/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
import requests
from airbyte_cdk.sources.declarative.auth.declarative_authenticator import DeclarativeAuthenticator
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.types import Config
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.declarative.types import Config, Record, StreamSlice, StreamState
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
from typing import Union, Mapping, Any
from typing import Union, Mapping, Any, Optional
from datetime import datetime


@dataclass
Expand Down Expand Up @@ -75,3 +77,26 @@ def sign(self):
def __post_init__(self, parameters: Mapping[str, Any]):
if isinstance(self.assume_role, str):
self.assume_role = InterpolatedString(self.assume_role, parameters=parameters)


class DateTimeTransformation(RecordTransformation):
DATETIME_FIELDS = ["commitTime", "startTime", "endTime"]

def transform(
self,
record: Record,
config: Optional[Config] = None,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
) -> Record:
for field in self.DATETIME_FIELDS:
if field in record and record[field] is not None:
# Convert scientific notation to float
timestamp = float(record[field])
# Convert to seconds if in milliseconds
if timestamp > 1e11:
timestamp = timestamp / 1000
# Convert to datetime string
dt = datetime.fromtimestamp(timestamp)
record[field] = dt.isoformat()
return record
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ definitions:

jobs_stream:
$ref: "#/definitions/base_stream"
transformations:
- source_aws_amplify.components.DateTimeTransformation
retriever:
$ref: "#/definitions/retriever"
requester:
Expand Down

0 comments on commit 590be24

Please sign in to comment.