Browse Source

Initial commit

Jan Sušnik 5 years ago
commit
ce1536175d
9 changed files with 553 additions and 0 deletions
  1. 9 0
      README.md
  2. 38 0
      czlib_test.go
  3. 96 0
      reader.go
  4. 43 0
      reader_test.go
  5. 114 0
      writer.go
  6. 41 0
      writer_test.go
  7. 60 0
      zstream.c
  8. 134 0
      zstream.go
  9. 18 0
      zstream.h

+ 9 - 0
README.md

@@ -0,0 +1,9 @@
+# czlib
+
+C zlib wrapper for Go language based on YouTube's Vitess cgzip wrapper ([src](https://github.com/vitessio/vitess/tree/master/go/cgzip), [docs](https://godoc.org/github.com/youtube/vitess/go/cgzip)).
+
+## Examples
+
+- For complete example of compressing and decompressing take a look at `czlib_test.go` file.
+- For example of compressing take a look at `writer_test.go` file.
+- For example of decompressing take a look at `reader_test.go` file.

+ 38 - 0
czlib_test.go

@@ -0,0 +1,38 @@
+package czlib
+
+import (
+	"bytes"
+	"io"
+	"testing"
+)
+
+func checkFatalError(t *testing.T, err error) {
+	if err != nil {
+		t.Fatal(err)
+	}
+}
+
+func TestDeflateInflate(t *testing.T) {
+	compressInput := "hello, world\n"
+	var b bytes.Buffer
+
+	w := NewWriter(&b)
+	_, err := w.Write([]byte(compressInput))
+	checkFatalError(t, err)
+	err = w.Close()
+	checkFatalError(t, err)
+
+	br := bytes.NewReader(b.Bytes())
+	r, err := NewReader(br)
+	checkFatalError(t, err)
+
+	buf := &bytes.Buffer{}
+	_, err = io.Copy(buf, r)
+	checkFatalError(t, err)
+	err = r.Close()
+	checkFatalError(t, err)
+
+	if buf.String() != "hello, world\n" {
+		t.Errorf("output differs from expected")
+	}
+}

+ 96 - 0
reader.go

@@ -0,0 +1,96 @@
+package czlib
+
+import (
+	"fmt"
+	"io"
+)
+
+type reader struct {
+	r      io.Reader
+	in     []byte
+	strm   *zstream
+	err    error
+	skipIn bool
+}
+
+// NewReader with default options
+func NewReader(r io.Reader) (io.ReadCloser, error) {
+	return NewReaderCustom(r, MAX_WBITS)
+}
+
+// NewReaderCustom for specifying window bits flag
+func NewReaderCustom(r io.Reader, windowBits int) (io.ReadCloser, error) {
+	return NewReaderCustomBuffer(r, windowBits, DEFAULT_COMPRESSED_BUFFER_SIZE)
+}
+
+// NewReaderCustomBuffer for specifying window bits flag and buffer size
+func NewReaderCustomBuffer(r io.Reader, windowBits, bufferSize int) (io.ReadCloser, error) {
+	z := &reader{r: r, in: make([]byte, bufferSize), strm: zstreamAlloc()}
+	if err := z.strm.inflateInit(windowBits); err != nil {
+		return nil, err
+	}
+	return z, nil
+}
+
+// Read bytes for decompression
+func (z *reader) Read(p []byte) (int, error) {
+	if z.err != nil {
+		return 0, z.err
+	} else if len(p) == 0 {
+		return 0, nil
+	}
+
+	z.strm.setOutBuf(p, len(p))
+
+	for {
+		if !z.skipIn && z.strm.availIn() == 0 {
+			var n int
+			n, z.err = z.r.Read(z.in)
+
+			if n > 0 && z.err == io.EOF {
+				z.err = nil
+			}
+
+			if (z.err != nil && z.err != io.EOF) || (n == 0 && z.err == io.EOF) {
+				z.strm.inflateEnd()
+				return 0, z.err
+			}
+
+			z.strm.setInBuf(z.in, n)
+		} else {
+			z.skipIn = false
+		}
+
+		ret, err := z.strm.inflate(Z_NO_FLUSH)
+		if err != nil {
+			z.err = err
+			z.strm.inflateEnd()
+			return 0, z.err
+		}
+
+		have := len(p) - z.strm.availOut()
+		if have > 0 {
+			z.skipIn = ret == Z_OK && z.strm.availOut() == 0
+			return have, z.err
+		}
+	}
+}
+
+// Close reader
+func (z *reader) Close() error {
+	if z.strm == nil {
+		panic(fmt.Errorf("czlib: stream closed"))
+	}
+	defer func() {
+		z.strm.inflateEnd()
+		zstreamFree(z.strm)
+		z.strm = nil
+	}()
+
+	if z.err != io.EOF {
+		return z.err
+	}
+
+	z.err = io.EOF
+	return nil
+}

+ 43 - 0
reader_test.go

@@ -0,0 +1,43 @@
+package czlib
+
+import (
+	"bytes"
+	"io"
+	"testing"
+)
+
+func TestInflateDefault(t *testing.T) {
+	defaultInput := []byte{120, 156, 203, 72, 205, 201, 201, 215, 81,
+		40, 207, 47, 202, 73, 225, 2, 0, 33, 231, 4, 147}
+	b := bytes.NewReader(defaultInput)
+	r, err := NewReader(b)
+	checkFatalError(t, err)
+
+	buf := &bytes.Buffer{}
+	_, err = io.Copy(buf, r)
+	checkFatalError(t, err)
+	err = r.Close()
+	checkFatalError(t, err)
+
+	if buf.String() != "hello, world\n" {
+		t.Errorf("output differs from expected")
+	}
+}
+
+func TestInflateBest(t *testing.T) {
+	bestInput := []byte{120, 218, 203, 72, 205, 201, 201, 215, 81,
+		40, 207, 47, 202, 73, 225, 2, 0, 33, 231, 4, 147}
+	b := bytes.NewReader(bestInput)
+	r, err := NewReader(b)
+	checkFatalError(t, err)
+
+	buf := &bytes.Buffer{}
+	_, err = io.Copy(buf, r)
+	checkFatalError(t, err)
+	err = r.Close()
+	checkFatalError(t, err)
+
+	if buf.String() != "hello, world\n" {
+		t.Errorf("output differs from expected")
+	}
+}

+ 114 - 0
writer.go

@@ -0,0 +1,114 @@
+package czlib
+
+import (
+	"fmt"
+	"io"
+)
+
+// DEFAULT_COMPRESSED_BUFFER_SIZE while compressing/decompressing
+const DEFAULT_COMPRESSED_BUFFER_SIZE = 32 * 1024
+
+// Writer is intended for compression logic
+type Writer struct {
+	w    io.Writer
+	out  []byte
+	strm *zstream
+	err  error
+}
+
+// NewWriter with default options
+func NewWriter(w io.Writer) *Writer {
+	z, _ := NewWriterCustom(w, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS, MAX_MEM_LEVEL, Z_DEFAULT_STRATEGY)
+	return z
+}
+
+// NewWriterCustom for specifying custom zlib flags
+func NewWriterCustom(w io.Writer, level, method, windowBits, memLevel, strategy int) (*Writer, error) {
+	return NewWriterCustomBuffer(w, level, method, windowBits, memLevel, strategy, DEFAULT_COMPRESSED_BUFFER_SIZE)
+}
+
+// NewWriterCustomBuffer for specifying custom zlib flags and buffer size
+func NewWriterCustomBuffer(w io.Writer, level, method, windowBits, memLevel, strategy, bufferSize int) (*Writer, error) {
+	z := &Writer{w: w, out: make([]byte, bufferSize), strm: zstreamAlloc()}
+	if err := z.strm.deflateInit(level, method, windowBits, memLevel, strategy); err != nil {
+		return nil, err
+	}
+	return z, nil
+}
+
+func (z *Writer) write(p []byte, flush int) int {
+	if len(p) == 0 {
+		z.strm.setInBuf(nil, 0)
+	} else {
+		z.strm.setInBuf(p, len(p))
+	}
+
+	for {
+		z.strm.setOutBuf(z.out, len(z.out))
+		z.strm.deflate(flush)
+
+		from := 0
+		have := len(z.out) - int(z.strm.availOut())
+		for have > 0 {
+			var n int
+			n, z.err = z.w.Write(z.out[from:have])
+			if z.err != nil {
+				z.strm.deflateEnd()
+				return 0
+			}
+			from += n
+			have -= n
+		}
+
+		if z.strm.availOut() != 0 {
+			break
+		}
+	}
+
+	if z.strm.availIn() != 0 {
+		panic(fmt.Errorf("czlib: unexpected error (2)"))
+	}
+
+	return len(p)
+}
+
+// Write bytes for compression
+func (z *Writer) Write(p []byte) (n int, err error) {
+	if z.err != nil {
+		return 0, z.err
+	}
+	n = z.write(p, Z_NO_FLUSH)
+	return n, z.err
+}
+
+// Flush contents for compression
+func (z *Writer) Flush() error {
+	if z.err != nil {
+		return z.err
+	}
+	z.write(nil, Z_SYNC_FLUSH)
+	return z.err
+}
+
+// Close writer
+func (z *Writer) Close() error {
+	if z.strm == nil {
+		panic(fmt.Errorf("czlib: stream closed"))
+	}
+	defer func() {
+		z.strm.deflateEnd()
+		zstreamFree(z.strm)
+		z.strm = nil
+		z.err = io.EOF
+	}()
+
+	if z.err != nil {
+		return z.err
+	}
+	z.write(nil, Z_FINISH)
+	if z.err != nil {
+		return z.err
+	}
+
+	return nil
+}

+ 41 - 0
writer_test.go

@@ -0,0 +1,41 @@
+package czlib
+
+import (
+	"bytes"
+	"testing"
+)
+
+func TestDeflate(t *testing.T) {
+	var b bytes.Buffer
+
+	w := NewWriter(&b)
+	_, err := w.Write([]byte("hello, world\n"))
+	checkFatalError(t, err)
+	err = w.Close()
+	checkFatalError(t, err)
+
+	expectedOut := []byte{120, 156, 203, 72, 205, 201, 201, 215, 81,
+		40, 207, 47, 202, 73, 225, 2, 0, 33, 231, 4, 147}
+
+	if !bytes.Equal(b.Bytes(), expectedOut) {
+		t.Errorf("output differs from expected")
+	}
+}
+
+func TestDeflateCustom(t *testing.T) {
+	var b bytes.Buffer
+
+	w, err := NewWriterCustom(&b, Z_BEST_COMPRESSION, Z_DEFLATED, MAX_WBITS, MAX_MEM_LEVEL, Z_DEFAULT_STRATEGY)
+	checkFatalError(t, err)
+	_, err = w.Write([]byte("hello, world\n"))
+	checkFatalError(t, err)
+	err = w.Close()
+	checkFatalError(t, err)
+
+	expectedOut := []byte{120, 218, 203, 72, 205, 201, 201, 215, 81,
+		40, 207, 47, 202, 73, 225, 2, 0, 33, 231, 4, 147}
+
+	if !bytes.Equal(b.Bytes(), expectedOut) {
+		t.Errorf("output differs from expected")
+	}
+}

+ 60 - 0
zstream.c

@@ -0,0 +1,60 @@
+#include <zlib.h>
+#include <stdlib.h>
+
+int zstream_deflate_init(z_stream *strm, int level, int method, int windowBits, int memLevel, int strategy) {
+  strm->zalloc = Z_NULL;
+  strm->zfree  = Z_NULL;
+  strm->opaque = Z_NULL;
+  return deflateInit2(strm, level, method, windowBits, memLevel, strategy);
+}
+
+int zstream_deflate(z_stream *strm, int flush) {
+  return deflate(strm, flush);
+}
+
+int zstream_deflate_end(z_stream *strm) {
+  return deflateEnd(strm);
+}
+
+void zstream_set_in_buf(z_stream *strm, void *buf, unsigned int len) {
+  strm->next_in  = (Bytef*)buf;
+  strm->avail_in = len;
+}
+
+void zstream_set_out_buf(z_stream *strm, void *buf, unsigned int len) {
+  strm->next_out  = (Bytef*)buf;
+  strm->avail_out = len;
+}
+
+unsigned int zstream_avail_in(z_stream *strm) {
+  return strm->avail_in;
+}
+
+unsigned int zstream_avail_out(z_stream *strm) {
+  return strm->avail_out;
+}
+
+int zstream_inflate_init(z_stream *strm, int windowBits) {
+  strm->zalloc   = Z_NULL;
+  strm->zfree    = Z_NULL;
+  strm->opaque   = Z_NULL;
+  strm->avail_in = 0;
+  strm->next_in  = Z_NULL;
+  return inflateInit2(strm, windowBits);
+}
+
+int zstream_inflate(z_stream *strm, int flush) {
+  return inflate(strm, flush);
+}
+
+void zstream_inflate_end(z_stream *strm) {
+  inflateEnd(strm);
+}
+
+z_stream* zstream_alloc() {
+  return calloc(1, sizeof(z_stream));
+}
+
+void zstream_free(z_stream *strm) {
+  free(strm);
+}

+ 134 - 0
zstream.go

@@ -0,0 +1,134 @@
+package czlib
+
+// #cgo CFLAGS: -Werror=implicit
+// #cgo pkg-config: zlib
+// #include "zstream.h"
+import "C"
+
+import (
+	"fmt"
+	"unsafe"
+)
+
+type zstream C.z_stream
+
+const (
+	// Allowed flush values
+	Z_NO_FLUSH      = C.Z_NO_FLUSH
+	Z_PARTIAL_FLUSH = C.Z_PARTIAL_FLUSH
+	Z_SYNC_FLUSH    = C.Z_SYNC_FLUSH
+	Z_FULL_FLUSH    = C.Z_FULL_FLUSH
+	Z_FINISH        = C.Z_FINISH
+	Z_BLOCK         = C.Z_BLOCK
+	Z_TREES         = C.Z_TREES
+
+	// Return codes for the compression/decompression functions
+	Z_OK            = C.Z_OK
+	Z_STREAM_END    = C.Z_STREAM_END
+	Z_NEED_DICT     = C.Z_NEED_DICT
+	Z_ERRNO         = C.Z_ERRNO
+	Z_STREAM_ERROR  = C.Z_STREAM_ERROR
+	Z_DATA_ERROR    = C.Z_DATA_ERROR
+	Z_MEM_ERROR     = C.Z_MEM_ERROR
+	Z_BUF_ERROR     = C.Z_BUF_ERROR
+	Z_VERSION_ERROR = C.Z_VERSION_ERROR
+
+	// Compression levels
+	Z_NO_COMPRESSION      = C.Z_NO_COMPRESSION
+	Z_BEST_SPEED          = C.Z_BEST_SPEED
+	Z_BEST_COMPRESSION    = C.Z_BEST_COMPRESSION
+	Z_DEFAULT_COMPRESSION = C.Z_DEFAULT_COMPRESSION
+
+	// Compression strategy
+	Z_FILTERED         = C.Z_FILTERED
+	Z_HUFFMAN_ONLY     = C.Z_HUFFMAN_ONLY
+	Z_RLE              = C.Z_RLE
+	Z_FIXED            = C.Z_FIXED
+	Z_DEFAULT_STRATEGY = C.Z_DEFAULT_STRATEGY
+
+	// Deflate compression method
+	Z_DEFLATED = C.Z_DEFLATED
+
+	// Various zlib constants
+	MAX_WBITS     = C.MAX_WBITS
+	MAX_MEM_LEVEL = C.MAX_MEM_LEVEL
+)
+
+func (strm *zstream) deflateInit(level int, method int, windowBits int, memLevel int, strategy int) error {
+	result := C.zstream_deflate_init((*C.z_stream)(strm), C.int(level), C.int(method), C.int(windowBits), C.int(memLevel), C.int(strategy))
+	if result != Z_OK {
+		return fmt.Errorf("czlib: deflate init failed (%v) :%v", result, strm.message())
+	}
+	return nil
+}
+
+func (strm *zstream) deflate(flag int) {
+	ret := C.zstream_deflate((*C.z_stream)(strm), C.int(flag))
+	if ret == Z_STREAM_ERROR {
+		panic(fmt.Errorf("czlib: unexpected error (1)"))
+	}
+}
+
+func (strm *zstream) deflateEnd() {
+	C.zstream_deflate_end((*C.z_stream)(strm))
+}
+
+func (strm *zstream) setInBuf(buf []byte, size int) {
+	if buf == nil {
+		C.zstream_set_in_buf((*C.z_stream)(strm), nil, C.uint(size))
+	} else {
+		C.zstream_set_in_buf((*C.z_stream)(strm), unsafe.Pointer(&buf[0]), C.uint(size))
+	}
+}
+
+func (strm *zstream) setOutBuf(buf []byte, size int) {
+	if buf == nil {
+		C.zstream_set_out_buf((*C.z_stream)(strm), nil, C.uint(size))
+	} else {
+		C.zstream_set_out_buf((*C.z_stream)(strm), unsafe.Pointer(&buf[0]), C.uint(size))
+	}
+}
+
+func (strm *zstream) availIn() int {
+	return int(C.zstream_avail_in((*C.z_stream)(strm)))
+}
+
+func (strm *zstream) availOut() int {
+	return int(C.zstream_avail_out((*C.z_stream)(strm)))
+}
+
+func (strm *zstream) inflateInit(windowBits int) error {
+	result := C.zstream_inflate_init((*C.z_stream)(strm), C.int(windowBits))
+	if result != Z_OK {
+		return fmt.Errorf("czlib: inflate init failed (%v): %v", result, strm.message())
+	}
+	return nil
+}
+
+func (strm *zstream) inflate(flush int) (int, error) {
+	ret := C.zstream_inflate((*C.z_stream)(strm), C.int(flush))
+	switch ret {
+	case Z_NEED_DICT:
+		ret = Z_DATA_ERROR
+		fallthrough
+	case Z_DATA_ERROR, Z_MEM_ERROR:
+		return int(ret), fmt.Errorf("czlib: inflate failed (%v): %v", ret, strm.message())
+	}
+	return int(ret), nil
+}
+
+func (strm *zstream) inflateEnd() {
+	C.zstream_inflate_end((*C.z_stream)(strm))
+}
+
+func zstreamAlloc() *zstream {
+	return (*zstream)(C.zstream_alloc())
+}
+
+func zstreamFree(strm *zstream) {
+	C.zstream_free((*C.z_stream)(strm))
+}
+
+func (strm *zstream) message() string {
+	return C.GoString(strm.msg)
+}

+ 18 - 0
zstream.h

@@ -0,0 +1,18 @@
+#ifndef _ZSTREAM_H
+#define _ZSTREAM_H
+#include <zlib.h>
+
+int zstream_deflate_init(z_stream *strm, int level, int method, int windowBits, int memLevel, int strategy);
+int zstream_deflate(z_stream *strm, int flush);
+int zstream_deflate_end(z_stream *strm);
+void zstream_set_in_buf(z_stream *strm, void *buf, unsigned int len);
+void zstream_set_out_buf(z_stream *strm, void *buf, unsigned int len);
+unsigned int zstream_avail_in(z_stream *strm);
+unsigned int zstream_avail_out(z_stream *strm);
+int zstream_inflate_init(z_stream *strm, int windowBits);
+int zstream_inflate(z_stream *strm, int flush);
+void zstream_inflate_end(z_stream *strm);
+z_stream* zstream_alloc();
+void zstream_free(z_stream *strm);
+
+#endif