Extend DocumentBuffer with apply interface
The existing DocumentBuffer interface provided no facility to iterate over events in order without caching them all in memory and pushing/popping them. This was deemed significantly unoptimal and so Apply() was born. When Apply() is called, DocumentBuffer will recieve a filter callable (functor, lambda, or closure) and will call it on all documents in order throughout the entire cache and store, without caching stored events or modifying the cache either. The filter Apply() uses will recieve one document and return true or false: true to continue iterating and false to stop iterating. Signed-off-by: Ava Affine <ava@sunnypup.io>
This commit is contained in:
parent
1ef8ff042f
commit
609c50ff7d
2 changed files with 120 additions and 15 deletions
|
|
@ -48,6 +48,14 @@ type DocumentBuffer interface {
|
||||||
* removing them from either data structure.
|
* removing them from either data structure.
|
||||||
*/
|
*/
|
||||||
Read(int) ([]string, error)
|
Read(int) ([]string, error)
|
||||||
|
/* Apply
|
||||||
|
* Takes a callable/closure/function that accepts a string (document)
|
||||||
|
* and returns a bool. Apply returns an error, or nil.
|
||||||
|
* Apply will begin applying this callable to each doc from most recent
|
||||||
|
* to least recent so long as the callable continues to return true.
|
||||||
|
* If the callable returns false, Apply ceases iteration and returns.
|
||||||
|
*/
|
||||||
|
Apply(func(string)bool) error
|
||||||
/* Close
|
/* Close
|
||||||
* flushes and then prepares backing store for closure
|
* flushes and then prepares backing store for closure
|
||||||
*/
|
*/
|
||||||
|
|
@ -86,7 +94,7 @@ func (b *DocBuf) promote(bounds bool) error {
|
||||||
return errors.New(PromoteToFullCacheError)
|
return errors.New(PromoteToFullCacheError)
|
||||||
}
|
}
|
||||||
|
|
||||||
doc, err := b.readDocumentsFromDisk(1, true)
|
doc, err := b.readDocumentsFromDisk(1, true, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -135,16 +143,25 @@ func (b *DocBuf) writeToBackingStore(doc string) error {
|
||||||
func (b *DocBuf) readDocumentsFromDisk(
|
func (b *DocBuf) readDocumentsFromDisk(
|
||||||
count int,
|
count int,
|
||||||
truncate bool,
|
truncate bool,
|
||||||
|
continues bool,
|
||||||
) ([]string, error) {
|
) ([]string, error) {
|
||||||
docs := []string{}
|
docs := []string{}
|
||||||
cursor := int64(0)
|
cursor := int64(0)
|
||||||
|
var err error
|
||||||
|
|
||||||
if count < 1 {
|
if count < 1 {
|
||||||
return docs, errors.New(BadReadRequestError)
|
return docs, errors.New(BadReadRequestError)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if continues {
|
||||||
|
cursor, err = b.store.Seek(0, io.SeekCurrent)
|
||||||
|
if b.endIndex == 0 || err != nil || cursor == 0 {
|
||||||
|
return docs, err
|
||||||
|
}
|
||||||
|
|
||||||
|
} else {
|
||||||
// catch store is empty
|
// catch store is empty
|
||||||
cursor, err := b.store.Seek(0, io.SeekEnd)
|
cursor, err = b.store.Seek(0, io.SeekEnd)
|
||||||
if b.endIndex == 0 || err != nil || cursor == 0 {
|
if b.endIndex == 0 || err != nil || cursor == 0 {
|
||||||
return docs, err
|
return docs, err
|
||||||
}
|
}
|
||||||
|
|
@ -153,8 +170,8 @@ func (b *DocBuf) readDocumentsFromDisk(
|
||||||
if b.endIndex > cursor {
|
if b.endIndex > cursor {
|
||||||
b.endIndex = cursor
|
b.endIndex = cursor
|
||||||
}
|
}
|
||||||
|
|
||||||
cursor = b.endIndex - 1
|
cursor = b.endIndex - 1
|
||||||
|
}
|
||||||
|
|
||||||
for len(docs) < count && cursor >= 0 {
|
for len(docs) < count && cursor >= 0 {
|
||||||
doc := ""
|
doc := ""
|
||||||
|
|
@ -190,7 +207,7 @@ func (b *DocBuf) readDocumentsFromDisk(
|
||||||
* each read will stop on the newline so make sure
|
* each read will stop on the newline so make sure
|
||||||
* we dont try to account for an empty document
|
* we dont try to account for an empty document
|
||||||
*/
|
*/
|
||||||
if (doc != "\n") {
|
if len(doc) > 0 && doc != "\n" {
|
||||||
str, err := base64.StdEncoding.DecodeString(doc)
|
str, err := base64.StdEncoding.DecodeString(doc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return docs, err
|
return docs, err
|
||||||
|
|
@ -338,7 +355,7 @@ func (b *DocBuf) Peek() (string, error) {
|
||||||
if len(b.cache) > 0 {
|
if len(b.cache) > 0 {
|
||||||
return b.cache[0], nil
|
return b.cache[0], nil
|
||||||
} else {
|
} else {
|
||||||
d, e := b.readDocumentsFromDisk(1, false)
|
d, e := b.readDocumentsFromDisk(1, false, false)
|
||||||
if len(d) < 1 {
|
if len(d) < 1 {
|
||||||
return "", e
|
return "", e
|
||||||
}
|
}
|
||||||
|
|
@ -355,7 +372,7 @@ func (b *DocBuf) Read(count int) ([]string, error) {
|
||||||
candidates := b.cache[:count - (delta)]
|
candidates := b.cache[:count - (delta)]
|
||||||
|
|
||||||
if delta > 0 {
|
if delta > 0 {
|
||||||
fromStorage, err := b.readDocumentsFromDisk(delta, false)
|
fromStorage, err := b.readDocumentsFromDisk(delta, false, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return candidates, err
|
return candidates, err
|
||||||
}
|
}
|
||||||
|
|
@ -366,6 +383,61 @@ func (b *DocBuf) Read(count int) ([]string, error) {
|
||||||
return candidates, nil
|
return candidates, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Apply
|
||||||
|
* Takes a callable/closure/function that accepts a string (document)
|
||||||
|
* and returns a bool. Apply returns an error, or nil.
|
||||||
|
* Apply will begin applying this callable to each doc from most recent
|
||||||
|
* to least recent so long as the callable continues to return true.
|
||||||
|
* If the callable returns false, Apply ceases iteration and returns.
|
||||||
|
*/
|
||||||
|
func (b *DocBuf) Apply(f func(string)bool) error {
|
||||||
|
// iterate over internal cache applying function
|
||||||
|
for _, i := range b.cache {
|
||||||
|
if !f(i) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// begin iterating with readDocumentsFromDisk
|
||||||
|
first := true
|
||||||
|
flag := true
|
||||||
|
for flag {
|
||||||
|
doc, err := b.readDocumentsFromDisk(1, false, !first)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if len(doc) == 0 {
|
||||||
|
return nil
|
||||||
|
} else if len(doc) != 1 {
|
||||||
|
return errors.New("improper read from buffer")
|
||||||
|
}
|
||||||
|
|
||||||
|
first = false
|
||||||
|
flag = f(doc[0])
|
||||||
|
|
||||||
|
if flag {
|
||||||
|
var c, d int64
|
||||||
|
var err error
|
||||||
|
// since continue is on we need to bring the cursor back one
|
||||||
|
if c, err = b.store.Seek(0, io.SeekCurrent); err != nil {
|
||||||
|
return err
|
||||||
|
// dont seek earlier than 0
|
||||||
|
} else if c < 1 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if d, err = b.store.Seek(-1, io.SeekCurrent); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if c == d {
|
||||||
|
return errors.New("Seek failure!")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
/* Close
|
/* Close
|
||||||
* flushes and then prepares backing store for closure.
|
* flushes and then prepares backing store for closure.
|
||||||
* returns the end index into the underlying stream...
|
* returns the end index into the underlying stream...
|
||||||
|
|
|
||||||
|
|
@ -302,3 +302,36 @@ func TestInitialize(t *testing.T) {
|
||||||
t.Fatalf("error reading: %e", err)
|
t.Fatalf("error reading: %e", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestApply(t *testing.T) {
|
||||||
|
backingStore := NewReadWriteSeekString()
|
||||||
|
backingStore.Write([]byte("MQo=\nMgo=\nMwo=\nNAo=\nNQo=\nNgo="))
|
||||||
|
buf, e := NewDocumentBuffer(3, &backingStore)
|
||||||
|
if e != nil {
|
||||||
|
t.Fatalf("error making documentbuffer: %e", e)
|
||||||
|
}
|
||||||
|
|
||||||
|
// test cached
|
||||||
|
if buf.Cached() != 3 {
|
||||||
|
t.Fatalf("expected 3 docs in cache")
|
||||||
|
}
|
||||||
|
|
||||||
|
count := 0
|
||||||
|
if err := buf.Apply(func(doc string) bool {
|
||||||
|
count += 1
|
||||||
|
return true
|
||||||
|
}); err != nil || count != 6 {
|
||||||
|
t.Fatalf("error applying: %e", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
count = 0
|
||||||
|
if err := buf.Apply(func(doc string) bool {
|
||||||
|
if doc == "2\n" {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
count += 1
|
||||||
|
return true
|
||||||
|
}); err != nil || count != 4 {
|
||||||
|
t.Fatalf("error applying: %e", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue