#define PY_SSIZE_T_CLEAN #define Py_LIMITED_API 0x030B0000 #include <stdlib.h> #include <Python.h> #include "zlib.h" #define DEF_MEM_LEVEL 8 // malloc & free copied from CPython static void* PyZlib_Malloc(voidpf ctx, uInt items, uInt size) { if (size != 0 && items > (size_t)PY_SSIZE_T_MAX / size) return NULL; /* PyMem_Malloc() cannot be used: the GIL is not held when inflate() and deflate() are called */ /* The builtin zlib module uses PyMem_RawMalloc here, but this was only added to the stable ABI from Python 3.13, so use plain malloc for now */ return malloc((size_t)items * (size_t)size); } static void PyZlib_Free(voidpf ctx, void *ptr) { /* The builtin zlib module uses PyMem_RawFree here, but this was only added to the stable ABI from Python 3.13, so use plain free for now */ free(ptr); } static PyObject * compress_into(PyObject *module, PyObject *args) { PyObject *return_value = NULL; Py_buffer input, output; Py_ssize_t bytes_written; int level, wbits; z_stream zst; if (!PyArg_ParseTuple(args, "y*w*ii", &input, &output, &level, &wbits)) { return NULL; } if (output.len <= 0) { PyErr_SetString(PyExc_ValueError, "Output buffer may not be 0 size"); goto done; } zst.opaque = NULL; zst.zalloc = PyZlib_Malloc; zst.zfree = PyZlib_Free; zst.next_in = input.buf; zst.avail_in = input.len; zst.next_out = output.buf; zst.avail_out = output.len; int err = deflateInit2(&zst, level, Z_DEFLATED, wbits, DEF_MEM_LEVEL, Z_DEFAULT_STRATEGY); switch (err) { case Z_OK: break; case Z_MEM_ERROR: PyErr_SetString(PyExc_MemoryError, "Out of memory while compressing data"); goto done; case Z_STREAM_ERROR: PyErr_SetString(PyExc_ValueError, "Bad compression level"); goto done; default: deflateEnd(&zst); PyErr_SetString(PyExc_RuntimeError, "Other error"); //zlib_error(state, zst, err, "while compressing data"); goto done; } Py_BEGIN_ALLOW_THREADS err = deflate(&zst, Z_FINISH); Py_END_ALLOW_THREADS switch (err) { case Z_STREAM_END: break; case Z_OK: case Z_BUF_ERROR: deflateEnd(&zst); PyErr_SetString(PyExc_BufferError, "Not enough space in output buffer"); goto done; default: deflateEnd(&zst); PyErr_SetString(PyExc_RuntimeError, "Other error"); goto done; } bytes_written = output.len - zst.avail_out; deflateEnd(&zst); return_value = PyLong_FromSsize_t(bytes_written); done: PyBuffer_Release(&input); PyBuffer_Release(&output); return return_value; } static PyMethodDef ZlibIntoMethods[] = { {"compress_into", compress_into, METH_VARARGS, "zlib compress data into a buffer"}, {NULL, NULL, 0, NULL} }; static struct PyModuleDef zlibintomodule = { PyModuleDef_HEAD_INIT, "zlib_into", NULL, // docstring -1, ZlibIntoMethods }; PyMODINIT_FUNC PyInit_zlib_into(void) { return PyModule_Create(&zlibintomodule); }