In our previous post, we concluded that S3 is much cheaper than a RDS Postgres installation.

The use case we had in mind was to create a download functionality that uses Postgres and S3 to create a seamless CSV file download experience.

We incrementally create our functionality and we will start simply by downloading Trade data from a Postgres database

Setting up Postgres DB

version: "3.5"
image: "postgres:13"
- "5432:5432"

with the above docker-compose.yaml file we will start a simple local Postgres DB, docker & docker-compose should be installed and the docker daemon should be running.

docker-compose up should give a similar result in your terminal


This small program merges multiple read sources to a write target. The main functionality is depicted below:

func write(sink io.Writer, dataSources error {
b := make([]byte, 0, 1024)
for _, ds := range dataSources {
for {
n, err := ds.Read(b[0:1024])
if n > 0 {
_, writerErr := sink.Write(b[0:n])
if writerErr != nil {
return writerErr
if err != nil {
if err == io.EOF {
return err
return nil

data sources

Our Postgres dataSource should implement io.Reader:

type Reader interface {
Read(p []byte) (n int, err error)

Postgres data source

using a lightweight ORM mapping our Postgres records to Trade struct array
type Trade struct {
ID int64 `bun:",pk,autoincrement"`
DateTime time.Time
SymbolPair string
Type string // buy/sell
Price float64
Amount float64
Fee float64
Total float64

and using a paged approach for retrieving our data.

func (p *PostgresReader) next(cursor int64) (entries []model.Trade, current *Cursor, err error) {
if err = p.db.NewSelect().
Where("id > ?", cursor).
Where("date_time >= ?", p.from).
Where("date_time < ?",
OrderExpr("id ASC").
Scan(p.ctx); err != nil {
return nil, &Cursor{}, err
return entries, NewCursor(entries), nil

The read interface is expecting a []byte to fill, we make use of a byte buffer

func (p *PostgresReader) Read(data []byte) (n int, err error) {
// is there still some data left in the buffer
if p.buffer.Len() > 0 {
n, err = p.buffer.Read(data)
if err != nil && err != io.EOF {
return 0, err
// buffer empty, last read
return n, nil

// fill buffer with data to read
err = p.fillNewBuffer()
if err != nil {
return 0, err

n, err = p.buffer.Read(data)

when the buffer is empty we will fill it again with some Postgres records converted to CSV.

func (p *PostgresReader) fillNewBuffer() (err error) {
var cursor int64 = 0
if p.dbCursor != nil {
cursor = p.dbCursor.End
entries, c, err :=
if err != nil {
p.dbCursor = c

// create new buffer
buf := &bytes.Buffer{}
for _, trade := range entries {
_, err = buf.WriteString(strings.Join(trade.Values(), ","))
if err != nil {
p.buffer = buf



Generating 1000 records in our database, we complete our first source being a Postgres data source

from := time.Now().AddDate(-1, 0, 0)
to := time.Now()
fixtures.InitFixture(from, to, 1000)

db := datasource.OpenDB()

source1 := datasource.NewPostgresReader(context.Background(), db, 10, from, to)
download := NewDownload(time.Now(), time.Now())

fi, err := os.OpenFile("test.csv", os.O_WRONLY|os.O_CREATE, 0777)
if err != nil {
log.Printf("Error in Create\n")

download.write(fi, source1)


The generated CSV file containing all 1000 records

This concludes the conversion of a Postgres table to a CSV file using the io.Reader interface.

Next stop S3 data source in our upcoming blog toward a hybrid datastore :-)