diff --git a/internal/docbuf/docbuf.go b/internal/docbuf/docbuf.go index 89b8187..cc0610e 100644 --- a/internal/docbuf/docbuf.go +++ b/internal/docbuf/docbuf.go @@ -48,6 +48,14 @@ type DocumentBuffer interface { * removing them from either data structure. */ Read(int) ([]string, error) + /* Apply + * Takes a callable/closure/function that accepts a string (document) + * and returns a bool. Apply returns an error, or nil. + * Apply will begin applying this callable to each doc from most recent + * to least recent so long as the callable continues to return true. + * If the callable returns false, Apply ceases iteration and returns. + */ + Apply(func(string)bool) error /* Close * flushes and then prepares backing store for closure */ @@ -86,7 +94,7 @@ func (b *DocBuf) promote(bounds bool) error { return errors.New(PromoteToFullCacheError) } - doc, err := b.readDocumentsFromDisk(1, true) + doc, err := b.readDocumentsFromDisk(1, true, false) if err != nil { return err } @@ -135,26 +143,35 @@ func (b *DocBuf) writeToBackingStore(doc string) error { func (b *DocBuf) readDocumentsFromDisk( count int, truncate bool, + continues bool, ) ([]string, error) { docs := []string{} cursor := int64(0) + var err error if count < 1 { return docs, errors.New(BadReadRequestError) } - // catch store is empty - cursor, err := b.store.Seek(0, io.SeekEnd) - if b.endIndex == 0 || err != nil || cursor == 0 { - return docs, err - } - - // self repair? - if b.endIndex > cursor { - b.endIndex = cursor - } + if continues { + cursor, err = b.store.Seek(0, io.SeekCurrent) + if b.endIndex == 0 || err != nil || cursor == 0 { + return docs, err + } - cursor = b.endIndex - 1 + } else { + // catch store is empty + cursor, err = b.store.Seek(0, io.SeekEnd) + if b.endIndex == 0 || err != nil || cursor == 0 { + return docs, err + } + + // self repair? + if b.endIndex > cursor { + b.endIndex = cursor + } + cursor = b.endIndex - 1 + } for len(docs) < count && cursor >= 0 { doc := "" @@ -190,7 +207,7 @@ func (b *DocBuf) readDocumentsFromDisk( * each read will stop on the newline so make sure * we dont try to account for an empty document */ - if (doc != "\n") { + if len(doc) > 0 && doc != "\n" { str, err := base64.StdEncoding.DecodeString(doc) if err != nil { return docs, err @@ -338,7 +355,7 @@ func (b *DocBuf) Peek() (string, error) { if len(b.cache) > 0 { return b.cache[0], nil } else { - d, e := b.readDocumentsFromDisk(1, false) + d, e := b.readDocumentsFromDisk(1, false, false) if len(d) < 1 { return "", e } @@ -355,7 +372,7 @@ func (b *DocBuf) Read(count int) ([]string, error) { candidates := b.cache[:count - (delta)] if delta > 0 { - fromStorage, err := b.readDocumentsFromDisk(delta, false) + fromStorage, err := b.readDocumentsFromDisk(delta, false, false) if err != nil { return candidates, err } @@ -366,6 +383,61 @@ func (b *DocBuf) Read(count int) ([]string, error) { return candidates, nil } +/* Apply + * Takes a callable/closure/function that accepts a string (document) + * and returns a bool. Apply returns an error, or nil. + * Apply will begin applying this callable to each doc from most recent + * to least recent so long as the callable continues to return true. + * If the callable returns false, Apply ceases iteration and returns. + */ +func (b *DocBuf) Apply(f func(string)bool) error { + // iterate over internal cache applying function + for _, i := range b.cache { + if !f(i) { + return nil + } + } + + // begin iterating with readDocumentsFromDisk + first := true + flag := true + for flag { + doc, err := b.readDocumentsFromDisk(1, false, !first) + if err != nil { + return err + } + if len(doc) == 0 { + return nil + } else if len(doc) != 1 { + return errors.New("improper read from buffer") + } + + first = false + flag = f(doc[0]) + + if flag { + var c, d int64 + var err error + // since continue is on we need to bring the cursor back one + if c, err = b.store.Seek(0, io.SeekCurrent); err != nil { + return err + // dont seek earlier than 0 + } else if c < 1 { + break + } + if d, err = b.store.Seek(-1, io.SeekCurrent); err != nil { + return err + } + + if c == d { + return errors.New("Seek failure!") + } + } + } + + return nil +} + /* Close * flushes and then prepares backing store for closure. * returns the end index into the underlying stream... diff --git a/internal/docbuf/docbuf_test.go b/internal/docbuf/docbuf_test.go index 0408070..48bf03e 100644 --- a/internal/docbuf/docbuf_test.go +++ b/internal/docbuf/docbuf_test.go @@ -302,3 +302,36 @@ func TestInitialize(t *testing.T) { t.Fatalf("error reading: %e", err) } } + +func TestApply(t *testing.T) { + backingStore := NewReadWriteSeekString() + backingStore.Write([]byte("MQo=\nMgo=\nMwo=\nNAo=\nNQo=\nNgo=")) + buf, e := NewDocumentBuffer(3, &backingStore) + if e != nil { + t.Fatalf("error making documentbuffer: %e", e) + } + + // test cached + if buf.Cached() != 3 { + t.Fatalf("expected 3 docs in cache") + } + + count := 0 + if err := buf.Apply(func(doc string) bool { + count += 1 + return true + }); err != nil || count != 6 { + t.Fatalf("error applying: %e", err) + } + + count = 0 + if err := buf.Apply(func(doc string) bool { + if doc == "2\n" { + return false + } + count += 1 + return true + }); err != nil || count != 4 { + t.Fatalf("error applying: %e", err) + } +}