In our previous post, we concluded that S3 is much cheaper than a RDS Postgres installation.

The use case we had in mind was to create a download functionality that uses Postgres and S3 to create a seamless CSV file download experience.

We incrementally create our functionality and we will start simply by downloading Trade data from a Postgres database

Setting up Postgres DB


version: "3.5"
services:
postgres:
image: "postgres:13"
environment:
- POSTGRES_PASSWORD=password
ports:
- "5432:5432"

with the above docker-compose.yaml file we will start a simple local Postgres DB, docker & docker-compose should be installed and the docker daemon should be running.

docker-compose up should give a similar result in your terminal


download.go

This small program merges multiple read sources to a write target. The main functionality is depicted below:


func write(sink io.Writer, dataSources ...io.Reader) error {
b := make([]byte, 0, 1024)
for _, ds := range dataSources {
for {
n, err := ds.Read(b[0:1024])
if n > 0 {
_, writerErr := sink.Write(b[0:n])
if writerErr != nil {
return writerErr
}
}
if err != nil {
if err == io.EOF {
break
}
return err
}
}
}
return nil
}

data sources

Our Postgres dataSource should implement io.Reader:

type Reader interface {
Read(p []byte) (n int, err error)
}

Postgres data source

using a lightweight ORM mapping our Postgres records to Trade struct array
type Trade struct {
ID int64 `bun:",pk,autoincrement"`
DateTime time.Time
SymbolPair string
Type string // buy/sell
Price float64
Amount float64
Fee float64
Total float64
}

and using a paged approach for retrieving our data.


func (p *PostgresReader) next(cursor int64) (entries []model.Trade, current *Cursor, err error) {
if err = p.db.NewSelect().
Model(&entries).
Where("id > ?", cursor).
Where("date_time >= ?", p.from).
Where("date_time < ?", p.to).
OrderExpr("id ASC").
Limit(p.pageSize).
Scan(p.ctx); err != nil {
return nil, &Cursor{}, err
}
return entries, NewCursor(entries), nil
}

The read interface is expecting a []byte to fill, we make use of a byte buffer


func (p *PostgresReader) Read(data []byte) (n int, err error) {
// is there still some data left in the buffer
if p.buffer.Len() > 0 {
n, err = p.buffer.Read(data)
if err != nil && err != io.EOF {
return 0, err
}
// buffer empty, last read
return n, nil
}

// fill buffer with data to read
err = p.fillNewBuffer()
if err != nil {
return 0, err
}

n, err = p.buffer.Read(data)
return
}

when the buffer is empty we will fill it again with some Postgres records converted to CSV.

func (p *PostgresReader) fillNewBuffer() (err error) {
var cursor int64 = 0
if p.dbCursor != nil {
cursor = p.dbCursor.End
}
entries, c, err := p.next(cursor)
if err != nil {
return
}
p.dbCursor = c

// create new buffer
buf := &bytes.Buffer{}
for _, trade := range entries {
_, err = buf.WriteString(strings.Join(trade.Values(), ","))
if err != nil {
return
}
buf.WriteString("\r\n")
}
p.buffer = buf

return
}

main.go

Generating 1000 records in our database, we complete our first source being a Postgres data source

from := time.Now().AddDate(-1, 0, 0)
to := time.Now()
fixtures.InitFixture(from, to, 1000)

db := datasource.OpenDB()

source1 := datasource.NewPostgresReader(context.Background(), db, 10, from, to)
download := NewDownload(time.Now(), time.Now())

fi, err := os.OpenFile("test.csv", os.O_WRONLY|os.O_CREATE, 0777)
if err != nil {
log.Printf("Error in Create\n")
panic(err)
}

download.write(fi, source1)
fi.Close()

result

The generated CSV file containing all 1000 records


This concludes the conversion of a Postgres table to a CSV file using the io.Reader interface.

Next stop S3 data source in our upcoming blog toward a hybrid datastore :-)