In our previous post, we created a data source from Postgres DB, this post will address the S3 part for downloading data to CSV.
S3 is one of the backbones of AWS cloud and is really versatile, it can act even as a read-only DB using S3 select
We incrementally create our functionality and we will continue by downloading Trade data from an S3 blob store.
For the S3, we use MINIO as a local S3 compatible store.
Setting up S3(minio)
s3:
image: minio/minio
ports:
- "9000:9000"
- "9001:9001"
expose:
- "9000"
- "9001"
volumes:
- ./s3-data:/data
environment:
MINIO_ROOT_USER: minio_access_key
MINIO_ROOT_PASSWORD: minio_secret_key
MINIO_REGION: eu-west-1
command: server /data --console-address ":9001"
with the above docker-compose.yaml file we will start a simple local S3 compatible blob store DB, docker & docker-compose should be installed and the docker daemon should be running. Also, s3-data folder should be present
docker-compose up should give a similar result in your terminal
(base) ➜ download docker-compose up
WARNING: Found orphan containers (download_postgres_1) for this project. If you removed or renamed this service in your compose file, you can run this command with the --remove-orphans flag to clean it up.
Creating download_s3_1 ... done
Attaching to download_s3_1
s3_1 | API: http://172.19.0.3:9000 http://127.0.0.1:9000
s3_1 |
s3_1 | Console: http://172.19.0.3:9001 http://127.0.0.1:9001
s3_1 |
s3_1 | Documentation: https://docs.min.io
s3_1 |
s3_1 | You are running an older version of MinIO released 5 days ago
s3_1 | Update: Run `mc admin update`
s3_1 |
S3 data source
func (s *S3Reader) Next() ([]ports.CsvRecord, error) {
// start situation
if s.dbCursor == nil {
s.dbCursor = model.NewCursor[int64](0, 0)
}
// select partial data from csv
resp, err := s.client.SelectObjectContent(&s3.SelectObjectContentInput{
// bucket name
Bucket: aws.String("trade-store"),
// csv data
Key: aws.String("trade.csv"),
// s3 select
Expression: aws.String(fmt.Sprintf(`
SELECT * FROM S3Object s WHERE s."Id" > %d
AND CAST(s."date_time" AS TIMESTAMP)
BETWEEN CAST('%s' AS TIMESTAMP)
AND CAST('%s' AS TIMESTAMP)
LIMIT %d`,
s.dbCursor.End, s.from.Format(time.RFC3339), s.to.Format(time.RFC3339), s.pageSize)),
// lets us use select
ExpressionType: aws.String(s3.ExpressionTypeSql),
InputSerialization: &s3.InputSerialization{
CSV: &s3.CSVInput{
AllowQuotedRecordDelimiter: nil,
Comments: nil,
FieldDelimiter: nil,
// lets us use column values in select statement
FileHeaderInfo: aws.String("Use"),
QuoteCharacter: nil,
QuoteEscapeCharacter: nil,
RecordDelimiter: nil,
},
},
OutputSerialization: &s3.OutputSerialization{
CSV: &s3.CSVOutput{
QuoteFields: aws.String("ASNEEDED"),
RecordDelimiter: aws.String("\r\n"),
FieldDelimiter: aws.String(","),
QuoteCharacter: aws.String(`"`),
QuoteEscapeCharacter: aws.String(`"`),
},
},
})
if err != nil {
log.Printf("failed making API request, %v\n", err)
return nil, err
}
defer resp.EventStream.Close()
results := &bytes.Buffer{}
for event := range resp.EventStream.Events() {
switch e := event.(type) {
case *s3.RecordsEvent:
results.Write(e.Payload)
case *s3.StatsEvent:
fmt.Printf("Processed %d bytes\n", *e.Details.BytesProcessed)
case *s3.EndEvent:
break
}
}
// conversion csv -> model.Trade
resReader := csv.NewReader(results)
entries := make([]model.Trade, 0)
for {
record, err := resReader.Read()
if err == io.EOF {
break
}
trade := model.NewTrade(s.tm, record)
if trade == nil {
return nil, err
}
s.dbCursor.End = trade.Identifier()
entries = append(entries, *trade)
}
s.dbCursor = NewCursorFromEntries[model.Trade, int64](entries)
// conversion from model to interface array
models := make([]ports.CsvRecord, len(entries))
for i := range entries {
models[i] = entries[i]
}
return models, nil
}
For the CSV generation, I created custom tags and some mapper functionality. This is something for another post perhaps.
The conversion from S3->Trade-> ports.CsvRecord is a bit overkill, but I went for some generic implementation. So in later setups, arbitrary data could be used.
The next post combines The Postgres DB, S3(minio) and retrieves seamlessly partitioned data over both sources
0 Comments
Post a Comment