In our previous post, we created a data source from Postgres DB, this post will address the S3 part for downloading data to CSV.


S3 is one of the backbones of AWS cloud and is really versatile, it can act even as a read-only DB using S3 select 

We incrementally create our functionality and we will continue by downloading Trade data from an S3 blob store.

For the S3, we use MINIO as a local S3 compatible store.

Setting up S3(minio)

s3:
image: minio/minio
ports:
- "9000:9000"
- "9001:9001"
expose:
- "9000"
- "9001"
volumes:
- ./s3-data:/data
environment:
MINIO_ROOT_USER: minio_access_key
MINIO_ROOT_PASSWORD: minio_secret_key
MINIO_REGION: eu-west-1
command: server /data --console-address ":9001"

with the above docker-compose.yaml file we will start a simple local S3 compatible blob store DB, docker & docker-compose should be installed and the docker daemon should be running. Also, s3-data folder should be present

docker-compose up should give a similar result in your terminal

(base)   download docker-compose up

WARNING: Found orphan containers (download_postgres_1) for this project. If you removed or renamed this service in your compose file, you can run this command with the --remove-orphans flag to clean it up.

Creating download_s3_1 ... done

Attaching to download_s3_1

s3_1  | API: http://172.19.0.3:9000  http://127.0.0.1:9000 

s3_1  | 

s3_1  | Console: http://172.19.0.3:9001 http://127.0.0.1:9001 

s3_1  | 

s3_1  | Documentation: https://docs.min.io

s3_1  | 

s3_1  |  You are running an older version of MinIO released 5 days ago 

s3_1  |  Update: Run `mc admin update` 

s3_1  |


login into minio console http://lcalhost:9001 using secret/password from the docker-compose file and create a bucket "trade" store and upload a CSV with header according to the following layout

Id,date_time,symbol_pair,type,price,amount,fee,total
1,2021-03-26T20:17:21Z,BTCUSDT,sell,1.60,109.41,0.18,175.73

The end result in the browser should look something like this:



S3 data source

The code is augmented with comments to explain stuff a bit

func (s *S3Reader) Next() ([]ports.CsvRecord, error) {
// start situation
if s.dbCursor == nil {
s.dbCursor = model.NewCursor[int64](0, 0)
}

// select partial data from csv
resp, err := s.client.SelectObjectContent(&s3.SelectObjectContentInput{
// bucket name
Bucket: aws.String("trade-store"),
// csv data
Key: aws.String("trade.csv"),
// s3 select
Expression: aws.String(fmt.Sprintf(`
SELECT * FROM S3Object s WHERE s."Id" > %d
AND CAST(s."date_time" AS TIMESTAMP)
BETWEEN CAST('%s' AS TIMESTAMP)
AND CAST('%s' AS TIMESTAMP)
LIMIT %d`,
s.dbCursor.End, s.from.Format(time.RFC3339), s.to.Format(time.RFC3339), s.pageSize)),
// lets us use select
ExpressionType: aws.String(s3.ExpressionTypeSql),
InputSerialization: &s3.InputSerialization{
CSV: &s3.CSVInput{
AllowQuotedRecordDelimiter: nil,
Comments: nil,
FieldDelimiter: nil,
// lets us use column values in select statement
FileHeaderInfo: aws.String("Use"),
QuoteCharacter: nil,
QuoteEscapeCharacter: nil,
RecordDelimiter: nil,
},
},
OutputSerialization: &s3.OutputSerialization{
CSV: &s3.CSVOutput{
QuoteFields: aws.String("ASNEEDED"),
RecordDelimiter: aws.String("\r\n"),
FieldDelimiter: aws.String(","),
QuoteCharacter: aws.String(`"`),
QuoteEscapeCharacter: aws.String(`"`),
},
},
})

if err != nil {
log.Printf("failed making API request, %v\n", err)
return nil, err
}
defer resp.EventStream.Close()

results := &bytes.Buffer{}
for event := range resp.EventStream.Events() {
switch e := event.(type) {
case *s3.RecordsEvent:
results.Write(e.Payload)
case *s3.StatsEvent:
fmt.Printf("Processed %d bytes\n", *e.Details.BytesProcessed)
case *s3.EndEvent:
break
}
}

// conversion csv -> model.Trade
resReader := csv.NewReader(results)
entries := make([]model.Trade, 0)
for {
record, err := resReader.Read()
if err == io.EOF {
break
}
trade := model.NewTrade(s.tm, record)

if trade == nil {
return nil, err
}

s.dbCursor.End = trade.Identifier()
entries = append(entries, *trade)
}

s.dbCursor = NewCursorFromEntries[model.Trade, int64](entries)

// conversion from model to interface array
models := make([]ports.CsvRecord, len(entries))
for i := range entries {
models[i] = entries[i]
}
return models, nil
}

For the CSV generation, I created custom tags and some mapper functionality. This is something for another post perhaps.

The conversion from S3->Trade-> ports.CsvRecord is a bit overkill, but I went for some generic implementation. So in later setups, arbitrary data could be used.

The next post combines The Postgres DB, S3(minio) and retrieves seamlessly partitioned data over both sources