CodeCommitsIssuesPull requestsActionsInsightsSecurity
master

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS
SSH

Download ZIP

pg_kc.c

919lines · modepreview

/**

Copywrite 2011 CloudFlare, Inc.
@author Ian Pye <ian@cloudflare.com>

Provides a bridge from PostgreSQL to Kyoto Cabinet.

Use at your own risk.

*/

#include <kclangc.h>

#include <postgres.h>
#include <utils/builtins.h>
#include <utils/array.h>
#include <catalog/pg_type.h>
#include <fmgr.h>
#include <funcapi.h>

#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <dirent.h>
#include <stdlib.h>

#include "c/entry.pb-c.h"
#include "common.h"

#ifdef PG_MODULE_MAGIC
    PG_MODULE_MAGIC;
#endif

// Print debugging notes?
//#define CF_DUBUG 1;
//#define CF_NO_DB_IS_ERR 1;

// General constant defines
static const int MAX_JUMP_KEY_LEN = 256;
static const int MIN_ARGS = 3;
static const int START_VARIABLE_INDEX = 3;
static const int END_VARIABLE_INDEX = 7;
static const int MAX_KC_ROW_ENTRY = 256;
static const int KC_MAX_ROW_SER = 1028;
static const int KC_MAX_RID = 1028;

// Hold the DB and its asc. info.
typedef struct {
    KCDB                        *db;
    char                        *db_path;
    KCCUR                       *cur;
    char                        *jump_key;
    Cloudflare__ZoneTimeBucket  *msg;
    unsigned int                next_map;
    KCDB                        *kv_db;
    KCCUR                       *kv_cur;
} KC_ENTRY;

typedef struct {
    char                        *map_name;
    char                        *start_time;
    char                        *kv_map_name;
} KC_DB_INFO;

// Row type
typedef struct {
    int32                       vl_len_;                /* varlena header (do not touch directly!) */
    char                        *doctype;
    char                        *classification;
    char                        *pop;
    char                        *psource;
    char                        *key;
    int64_t                     value;
    //KC_DB_INFO                  db_info; // @TODO -- should this be set, allowing for working in/out functions?
} KC_ROW;

PG_FUNCTION_INFO_V1(kc_row_in);

// @TODO -- make this happy.
Datum kc_row_in(PG_FUNCTION_ARGS) {
    //char       *kv = PG_GETARG_CSTRING(0);
    KC_ROW     *out;

    size_t size = sizeof(KC_ROW);
    out = (KC_ROW *)palloc(size);
    memset(out, '0', size);
    SET_VARSIZE(out, size);
    out->doctype = (char *)palloc(sizeof(char) * MAX_KC_ROW_ENTRY);
    out->classification = (char *)palloc(sizeof(char) * MAX_KC_ROW_ENTRY);
    out->pop = (char *)palloc(sizeof(char) * MAX_KC_ROW_ENTRY);
    out->psource = (char *)palloc(sizeof(char) * MAX_KC_ROW_ENTRY);
    out->key = (char *)palloc(sizeof(char) * MAX_KC_ROW_ENTRY);

    strncpy(out->key, "lolkey -- fill me in", MAX_KC_ROW_ENTRY);
    out->value = 0;
    
    PG_RETURN_POINTER(out);
}

PG_FUNCTION_INFO_V1(kc_row_out);

Datum kc_row_out(PG_FUNCTION_ARGS) {
    KC_ROW     *in = (KC_ROW *) PG_GETARG_POINTER(0);
    char       *result;

    result = (char *) palloc(256);
    snprintf(result, KC_MAX_ROW_SER, "{%s,%s,%s,%s,%s,%ld}", in->classification, in->doctype, 
             in->pop, in->psource, in->key, in->value);
    PG_RETURN_CSTRING(result);
}

PG_FUNCTION_INFO_V1(kc_key);

Datum kc_key(PG_FUNCTION_ARGS) {
    KC_ROW     *in = (KC_ROW *) PG_GETARG_POINTER(0);
    text       *tout;

    tout = cstring_to_text(in->key);
    PG_RETURN_TEXT_P(tout);
}

PG_FUNCTION_INFO_V1(kc_val);

Datum kc_val(PG_FUNCTION_ARGS) {
    KC_ROW     *in = (KC_ROW *) PG_GETARG_POINTER(0);
    int64_t    result = in->value;
    PG_RETURN_INT64(result);
}

PG_FUNCTION_INFO_V1(kc_doctype);

Datum kc_doctype(PG_FUNCTION_ARGS) {
    KC_ROW     *in = (KC_ROW *) PG_GETARG_POINTER(0);
    text       *tout;   

    tout = cstring_to_text(in->doctype);
    PG_RETURN_TEXT_P(tout);
}

PG_FUNCTION_INFO_V1(kc_class);

Datum kc_class(PG_FUNCTION_ARGS) {
    KC_ROW     *in = (KC_ROW *) PG_GETARG_POINTER(0);
    text       *tout;   

    tout = cstring_to_text(in->classification);
    PG_RETURN_TEXT_P(tout);
}

PG_FUNCTION_INFO_V1(kc_pop);

Datum kc_pop(PG_FUNCTION_ARGS) {
    KC_ROW     *in = (KC_ROW *) PG_GETARG_POINTER(0);
    text       *tout;   

    tout = cstring_to_text(in->pop);
    PG_RETURN_TEXT_P(tout);
}

PG_FUNCTION_INFO_V1(kc_psource);

Datum kc_psource(PG_FUNCTION_ARGS) {
    KC_ROW     *in = (KC_ROW *) PG_GETARG_POINTER(0);
    text       *tout;   

    tout = cstring_to_text(in->psource);
    PG_RETURN_TEXT_P(tout);
}

PG_FUNCTION_INFO_V1(kc_sum);

Datum kc_sum(PG_FUNCTION_ARGS) {
    int64_t    total = PG_GETARG_INT64(0);
    KC_ROW     *in = (KC_ROW *) PG_GETARG_POINTER(1);
    int64_t    res = total + in->value;

    PG_RETURN_INT64(res);
}

// Open a DB.
bool open_db (KCDB *db, char *map_type, char *start_time) {
    // Figure out what db to open.
    char db_buffer[512];
    snprintf(db_buffer, 256, "%s/%s/pg_agg_map_%s.%s", BASE_DB_DIR, start_time, map_type, DB_TYPE);
    if (!kcdbopen(db, db_buffer, KCOWRITER)) {
#ifdef CF_NO_DB_IS_ERR
        ereport(ERROR,
                (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
                 errmsg("Error opening db: \"%s\", \"%s\". Make sure that the map_name is valid.", 
                        db_buffer, kcecodename(kcdbecode(db)))));
#endif
#ifdef CF_DUBUG
        ereport(NOTICE,
                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                 errmsg("Error opening db: \"%s\", \"%s\". Make sure that the map_name is valid.", 
                        db_buffer, kcecodename(kcdbecode(db)))));
#endif
        return false;
    }

#ifdef CF_DUBUG        
    ereport(NOTICE,
            (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
             errmsg("Opened db -- %s.", db_buffer)));
#endif 

    return true;
}

/**
   General idea is to provide an interface into a map_name:doctype:pop:psource key stored tree table.
 */

PG_FUNCTION_INFO_V1(kc_expand);

Datum kc_expand(PG_FUNCTION_ARGS) {

    KC_ENTRY                        *search;
    FuncCallContext                 *funcctx;
    int                             call_cntr;
    char                            *kbuf;
    size_t                          ksiz, vsiz;
    const char                      *cvbuf;
    char                            *kv_kbuf = NULL; 
    size_t                          kv_ksiz;
    int                             done;

    /* stuff done only on the first call of the function */
    if (SRF_IS_FIRSTCALL()) {
        MemoryContext   oldcontext;

        /* create a function context for cross-call persistence */
        funcctx = SRF_FIRSTCALL_INIT();

        /* switch to memory context appropriate for multiple function calls */
        oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);

        // Make sure that there are enough args.
        if (PG_NARGS() < MIN_ARGS) {
            ereport(ERROR,
                    (errcode(ERRCODE_EXTERNAL_ROUTINE_INVOCATION_EXCEPTION),
                     errmsg("Must run expand with at least %d args!", MIN_ARGS)));
        }

        /* Make the kcdb here. */
        search = (KC_ENTRY *)palloc(sizeof(KC_ENTRY)); 
        search->db = kcdbnew();
        if (open_db (search->db, text_to_cstring(PG_GETARG_TEXT_PP(0)), text_to_cstring(PG_GETARG_TEXT_PP(1)))) {

            // Set the key to jump into:
            // Call with -- map_name, result_id, class, doctype, pop, psource
            // Here, map_name describes a db to open.
            // Otherwise, result_id:class:doctype:pop:psource
            (search->jump_key) = (char *) palloc(MAX_JUMP_KEY_LEN * sizeof(char));

            int index_point;
            search->jump_key = text_to_cstring(PG_GETARG_TEXT_PP(2));
            int size_left = MAX_JUMP_KEY_LEN;
            for (index_point = START_VARIABLE_INDEX; index_point < END_VARIABLE_INDEX; index_point++) {
                if (PG_NARGS() > index_point) {
                    char *next_idx = text_to_cstring(PG_GETARG_TEXT_PP(index_point));
                    if (next_idx != NULL) {
                        size_left = size_left - (2 + strlen(next_idx));
                        strncat (search->jump_key, CF_LABEL_SEP, size_left);
                        strncat (search->jump_key, next_idx, size_left);
                    }
                }
            }
            
#ifdef CF_DUBUG
            ereport(NOTICE,
                    (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                     errmsg("Setting jump buffer -- [%s]", search->jump_key)));
#endif
            
            // Create a cursor, and set it to the base point looking for entries.
            search->cur = kcdbcursor(search->db);
            kccurjumpkey(search->cur, search->jump_key, MAX_JUMP_KEY_LEN);
        } else {
            search->db = NULL;
        }

        search->next_map = 0;
        search->msg = NULL;
        
        // Save the search struct for the subsequent calls.
        funcctx->user_fctx = search;

        MemoryContextSwitchTo(oldcontext);
    }

    /* stuff done on every call of the function */
    funcctx = SRF_PERCALL_SETUP();

    call_cntr = funcctx->call_cntr;
    search = (KC_ENTRY *) funcctx->user_fctx;
    
    // If no current msg, try to get the next one.
    done = 1;

#ifdef CF_DUBUG
    ereport(NOTICE,
            (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
             errmsg("beginning run")));
#endif

    if (search->msg) {

#ifdef CF_DUBUG  
        ereport(NOTICE,
                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                 errmsg("Incrementing next from map %d -- %zu", search->next_map, search->msg->n_map_entry)));
#endif

        // Case if we are using the external cursor running over kv map.
        // Ready the next 
        if (search->msg->kv_map_file) {
            
            if ((kv_kbuf = kccurgetkey(search->kv_cur, &kv_ksiz, 1)) == NULL) {
                done = 1;
                kccurdel(search->kv_cur);
                kcdbendtran (search->kv_db, 1);            
                if (!kcdbclose(search->kv_db)) {
                    ereport(ERROR,
                            (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
                             errmsg("Error Closeing db: \"%s\"", kcecodename(kcdbecode(search->kv_db)))));
                }

                // Also need to free this.
                cloudflare__zone_time_bucket__free_unpacked(search->msg, NULL);
                search->msg = NULL;

            } else {
                done = 0;
            }


        } else {
            if (search->next_map >= search->msg->n_map_entry) {
                // Done with this msg -- move on to the next one.
                cloudflare__zone_time_bucket__free_unpacked(search->msg, NULL);
                search->msg = NULL;
            } else {
                done = 0;
            }
        }
    }

    if (search->db && !search->msg) {
      
#ifdef CF_DUBUG  
        ereport(NOTICE,
                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                 errmsg("Getting new buf -- %s", search->jump_key)));
#endif        

        if ((kbuf = kccurget(search->cur, &ksiz, &cvbuf, &vsiz, 1)) != NULL) {
            // Pull up the PB and expand it.
            search->msg = cloudflare__zone_time_bucket__unpack(NULL, vsiz, (const uint8_t *)cvbuf);
            if (search->msg == NULL) {   // Something failed
                ereport(ERROR,
                        (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
                         errmsg("error unpacking incoming message")));
                done = 1;
            } else {
                // Does the buffer match the searched for string?
                // @TODO -- bound this?
                if (strstr(search->msg->db_key, search->jump_key)) {
                    done = 0;
                    search->next_map = 0;

                    // And load the kvkc if needed.
                    if (search->msg->kv_map_file) {
                        
#ifdef CF_DUBUG  
                        ereport(NOTICE,
                                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                 errmsg("Switching to kvs %s", search->msg->kv_map_file)));
#endif

                        search->kv_db = kcdbnew();
                        
                        if (!kcdbopen(search->kv_db, search->msg->kv_map_file, KCOWRITER)) {
#ifdef CF_NO_DB_IS_ERR
                            ereport(ERROR,
                                    (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
                                     errmsg("Error opening db: \"%s\", \"%s\". Make sure that the map_name is valid.", 
                                            search->msg->kv_map_file, kcecodename(kcdbecode(search->kv_db)))));
#endif
#ifdef CF_DUBUG
                            ereport(NOTICE,
                                    (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                     errmsg("Error opening db: \"%s\", \"%s\". Make sure that the map_name is valid.", 
                                            search->msg->kv_map_file, kcecodename(kcdbecode(search->kv_db)))));
#endif
                            done = 1;
                        } else {
                            kcdbbegintran (search->kv_db, 0);
                            search->kv_cur = kcdbcursor(search->kv_db);
                            kccurjump(search->kv_cur);   

                            if ((kv_kbuf = kccurgetkey(search->kv_cur, &kv_ksiz, 1)) == NULL) {
                                done = 1;
                                kccurdel(search->kv_cur);
                                kcdbendtran (search->kv_db, 1);
                                if (!kcdbclose(search->kv_db)) {
                                    ereport(ERROR,
                                            (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
                                             errmsg("Error Closeing db: \"%s\"", kcecodename(kcdbecode(search->kv_db)))));
                                }
                            } else {
                                done = 0;
                            }
                        }
                    }
                } else {
                    done = 1;
                }
            }
            kcfree(kbuf);
        } else {
#ifdef CF_DUBUG
            ereport(NOTICE,
                    (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                     errmsg("no msg to find")));
#endif
            done = 1;
        }
    }

#ifdef CF_DUBUG
    ereport(NOTICE,
            (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
             errmsg("Done? %d -- next buf -- %d", done, search->next_map)));   
#endif

    // Take the next itteration over the cursor. If the next is NULL or else not matching the resultid passed in
    // End. Otherwise, parse the value, populating the next row of the returning tuple.
    if (!done) {
        KC_ROW                          *out;
        Datum                           result;

        size_t size = sizeof(KC_ROW);
        out = (KC_ROW *)palloc(size);
        memset(out, '0', size);
        SET_VARSIZE(out, size);

        out->classification = (char *)palloc(MAX_KC_ROW_ENTRY * sizeof(char));
        out->doctype = (char *)palloc(MAX_KC_ROW_ENTRY * sizeof(char));
        out->pop = (char *)palloc(MAX_KC_ROW_ENTRY * sizeof(char));
        out->psource = (char *)palloc(MAX_KC_ROW_ENTRY * sizeof(char));
        out->key = (char *)palloc(MAX_KC_ROW_ENTRY * sizeof(char));

        strncpy(out->classification, search->msg->classification, MAX_KC_ROW_ENTRY);
        strncpy(out->doctype, search->msg->doctype, MAX_KC_ROW_ENTRY);
        strncpy(out->pop, search->msg->pop, MAX_KC_ROW_ENTRY);
        strncpy(out->psource, search->msg->psource, MAX_KC_ROW_ENTRY);

        if (search->msg->kv_map_file) {

#ifdef CF_DUBUG
            ereport(NOTICE,
                    (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                     errmsg("getting val from -- [%s]", search->msg->kv_map_file)));
#endif

            snprintf(out->key, MAX_KC_ROW_ENTRY, "%s", kv_kbuf);
            out->value = kcdbincrint (search->kv_db, kv_kbuf, kv_ksiz, 0);

            if (out->value == INT64_MIN) {
                ereport(NOTICE,
                        (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
                         errmsg("ERROR Getting val from key -- [%s], %s", kv_kbuf, kcecodename(kcdbecode(search->kv_db)))));
            }

            kcfree(kv_kbuf);
        } else {

#ifdef CF_DUBUG
            ereport(NOTICE,
                    (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                     errmsg("Loading %s %ld", search->msg->map_entry[search->next_map]->key, 
                            search->msg->map_entry[search->next_map]->value)));
#endif

            snprintf(out->key, MAX_KC_ROW_ENTRY, "%s", search->msg->map_entry[search->next_map]->key);        
            out->value = search->msg->map_entry[search->next_map]->value;
        }

        result = PointerGetDatum(out);

        /* clean up (this is not really necessary) */
        pfree(out->classification);
        pfree(out->doctype);
        pfree(out->pop);
        pfree(out->psource);
        pfree(out->key);
        pfree(out);

        // Remember that we are going to the next step.
        search->next_map++;

        SRF_RETURN_NEXT(funcctx, result);
    } else {    /* do when there is no more left */
        if (search->db) {
            kccurdel(search->cur);
            if (!kcdbclose(search->db)) {
                ereport(ERROR,
                        (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
                         errmsg("Error Closeing db: \"%s\"", kcecodename(kcdbecode(search->db)))));
            }
            
            if (search->msg != NULL) {
                cloudflare__zone_time_bucket__free_unpacked(search->msg, NULL);
            }
            
            pfree(search->jump_key);
        }
        pfree(search);

#ifdef CF_DUBUG
        ereport(NOTICE,
                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                 errmsg("Done with run")));
#endif

        // Don't delete db, this leads to segfaults.
        SRF_RETURN_DONE(funcctx);
    }
}

inline void set_kv_path(Cloudflare__ZoneTimeBucket *msg, char *map_name, char *start_time_uid, KCDB* db) {

    struct stat path_buf;
    char dir_buf[KC_MAX_RID];
    char *found;

    msg->kv_map_file = (char *)palloc(KC_MAX_RID * sizeof(char));
    snprintf(msg->kv_map_file, KC_MAX_RID, "%s/%s/%s/%s.%s", 
             CF_KV_MAP_BASE,
             start_time_uid,
             map_name,
             msg->db_key,
             DB_TYPE);

    snprintf(dir_buf, KC_MAX_RID, "%s", CF_KV_MAP_BASE);        
    if (stat(dir_buf, &path_buf) != 0) {
        mkdir (dir_buf, NEW_DIR_MASK);
    }
    chmod (dir_buf, NEW_DIR_MODE);
    
    found = strtok (start_time_uid, "/");
    while (found != NULL) {
        strncat (dir_buf, "/", KC_MAX_RID);
        strncat (dir_buf, found, KC_MAX_RID);
        if (stat(dir_buf, &path_buf) != 0) {
            mkdir (dir_buf, NEW_DIR_MODE);
        }
        chmod (dir_buf, NEW_DIR_MODE);
        found = strtok (NULL, "/");
    }
    strncat (dir_buf, "/", KC_MAX_RID);
    strncat (dir_buf, map_name, KC_MAX_RID);
    if (stat(dir_buf, &path_buf) != 0) {
        mkdir (dir_buf, NEW_DIR_MODE);
    }
    chmod (dir_buf, NEW_DIR_MODE);

    if (!kcdbopen(db, msg->kv_map_file, KCOWRITER | KCOCREATE)) {
        ereport(ERROR,
                (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
                 errmsg("KV open error for: %s %s\n", msg->kv_map_file, kcecodename(kcdbecode(db)))));
    }
    kcdbbegintran (db, 0);
    kcdbclear (db);
    chmod (msg->kv_map_file, FILE_MODE);
}

int merge_using_kv_map(Cloudflare__ZoneTimeBucket *msg_old, Cloudflare__ZoneTimeBucket *msg_new, KCDB* db) {

    unsigned int i;
    int num_new = 0;
    if (msg_new->kv_map_file) {
        // Open a db for the new map file.
        KCCUR* cur;
        KCDB* new_db;
        new_db = kcdbnew();
        /* open the database */
        if (!kcdbopen(new_db, msg_new->kv_map_file, KCOWRITER)) {
            ereport(ERROR,
                    (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
                     errmsg("open error: %s", kcecodename(kcdbecode(new_db)))));
        }
        
        cur = kcdbcursor(new_db);
        kccurjump(cur);
        
        char *kv_kbuf;
        size_t kv_ksiz;
        while ((kv_kbuf = kccurgetkey(cur, &kv_ksiz, 1)) != NULL) {
            int64_t value = kcdbincrint (new_db, kv_kbuf, kv_ksiz, 0);
            
#ifdef CF_DUBUG
            ereport(NOTICE,
                    (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                     errmsg("Saving: %s -- %ld", kv_kbuf, value)));
#endif

            kcdbincrint (db, kv_kbuf, kv_ksiz, value);
            num_new++;
            kcfree(kv_kbuf);
        }
        kccurdel(cur);
        kcdbclose(new_db);            
    } else {
        for (i = 0; i < msg_new->n_map_entry; i++) {
            kcdbincrint (db, msg_new->map_entry[i]->key, strlen(msg_new->map_entry[i]->key), msg_new->map_entry[i]->value);
            num_new++;
        }
    }

    return num_new;
}

int merge_messages_basic(Cloudflare__ZoneTimeBucket *msg_old, Cloudflare__ZoneTimeBucket *msg_new) {
    // Iterate over the list of the new, adding to the old.
    unsigned int i,j,last_entry;
    int num_new = 0;
    bool found = false;

    last_entry = msg_old->n_map_entry;
    for (i = 0; i < msg_new->n_map_entry; i++) {
        found = false;
        for (j = 0; j < last_entry; j++) {
            if (strcmp(msg_old->map_entry[j]->key, msg_new->map_entry[i]->key) == 0) {
                msg_old->map_entry[j]->value += msg_new->map_entry[i]->value;
                found = true;
                break;
            }
        }

        // If we get here, it is a new key. Copy over the info.
        if (!found) {
            msg_old->map_entry[msg_old->n_map_entry] = palloc(sizeof(Cloudflare__ZoneTimeBucket__Counter));
            memcpy (msg_old->map_entry[msg_old->n_map_entry], msg_new->map_entry[i], sizeof(Cloudflare__ZoneTimeBucket__Counter));
            msg_old->map_entry[msg_old->n_map_entry]->key = palloc(strlen(msg_new->map_entry[i]->key) + 1);
            strcpy(msg_old->map_entry[msg_old->n_map_entry]->key, msg_new->map_entry[i]->key);
            msg_old->n_map_entry++;
            num_new++;
        }
    }
    return num_new;
}

// Now, a function to shrink KC values down. Need to open in write mode for this ...
// map_name, new rid, array of rids, class, doctype, pop, psource
// Idea is to:
// Make a new buffer, with the passed in value.
// Iterate over the list of rids, doing rid:class:doctype:pop:psource. Get buffer.
// Go over hashmap, adding to the buffers map.
// Save using the new buffer info.

PG_FUNCTION_INFO_V1(kc_shrink);

Datum kc_shrink(PG_FUNCTION_ARGS) {
    
    char       *map_name = text_to_cstring(PG_GETARG_TEXT_PP(0));
    char       *start_time = text_to_cstring(PG_GETARG_TEXT_PP(1)); // Start time + uid!!!
    char       *new_rid = text_to_cstring(PG_GETARG_TEXT_PP(2));
    ArrayType  *old_rids = PG_GETARG_ARRAYTYPE_P(3);
    char       *classification = text_to_cstring(PG_GETARG_TEXT_PP(4));
    char       *doctype = text_to_cstring(PG_GETARG_TEXT_PP(5));
    char       *pop = text_to_cstring(PG_GETARG_TEXT_PP(6));
    char       *psource = text_to_cstring(PG_GETARG_TEXT_PP(7));
    text       *tout;   
    int        i,j;
    Datum      *rid_datums;
    bool       *rid_nulls;
    int        rid_count;
    char       *next_rid;
    KCDB       *main_db;
    char       *vbuf;
    size_t      vsiz;

    // Open our DB.
    main_db = kcdbnew();
    if (!open_db (main_db, map_name, start_time)) {
        tout = cstring_to_text(new_rid);
        PG_RETURN_TEXT_P(tout);        
    }
    kcdbbegintran (main_db, 0);
    
    // First fill in what we can from the input.
    Cloudflare__ZoneTimeBucket msg = CLOUDFLARE__ZONE_TIME_BUCKET__INIT;
    
    msg.map_name = (char *)palloc(MAX_KC_ROW_ENTRY * sizeof(char));
    msg.doctype = (char *)palloc(MAX_KC_ROW_ENTRY * sizeof(char));
    msg.classification = (char *)palloc(MAX_KC_ROW_ENTRY * sizeof(char));
    msg.pop = (char *)palloc(MAX_KC_ROW_ENTRY * sizeof(char));
    msg.psource = (char *)palloc(MAX_KC_ROW_ENTRY * sizeof(char));
    msg.result_id = (char *)palloc(MAX_KC_ROW_ENTRY * sizeof(char));
    msg.db_key = (char *)palloc(MAX_KC_ROW_ENTRY * sizeof(char));
    msg.db_path = (char *)palloc(MAX_KC_ROW_ENTRY * sizeof(char));
    msg.map_entry = palloc(MAX_KEYS_BEFORE_KV_MAP * sizeof(Cloudflare__ZoneTimeBucket__Counter));
    msg.n_map_entry = 0;

    strncpy(msg.map_name, map_name, MAX_KC_ROW_ENTRY);
    strncpy(msg.classification, classification, MAX_KC_ROW_ENTRY);
    strncpy(msg.doctype, doctype, MAX_KC_ROW_ENTRY);
    strncpy(msg.pop, pop, MAX_KC_ROW_ENTRY);
    strncpy(msg.psource, psource, MAX_KC_ROW_ENTRY);
    strncpy(msg.result_id, new_rid, KC_MAX_RID);
    snprintf(msg.db_path, MAX_KC_ROW_ENTRY, "%s%s%s", 
             map_name, "/", start_time);

    snprintf(msg.db_key, KC_MAX_RID, "%s%s%s%s%s%s%s%s%s%s%s", 
             new_rid, CF_LABEL_SEP,
             classification, CF_LABEL_SEP, 
             doctype, CF_LABEL_SEP,
             pop, CF_LABEL_SEP,
             psource, CF_LABEL_SEP,
             map_name);

    // Now run over the array.
    deconstruct_array(old_rids, TEXTOID, -1, false, 'i',
                      &rid_datums, &rid_nulls, &rid_count);
    if (ARR_HASNULL(old_rids)) {
        ereport(ERROR,
                (errcode(ERRCODE_ARRAY_ELEMENT_ERROR),
                 errmsg("cannot work with arrays containing NULLs")));
    }

    int num_new_keys = 0;
    int num_entries = 0;
    char keys_to_use[rid_count][KC_MAX_RID];
    Cloudflare__ZoneTimeBucket *msg_new[rid_count];
    j=0;
    for (i = 0; i < rid_count; i++) {
        next_rid = TextDatumGetCString(rid_datums[i]);
        snprintf(keys_to_use[i], KC_MAX_RID, "%s%s%s%s%s%s%s%s%s%s%s", 
                 next_rid, CF_LABEL_SEP, 
                 classification, CF_LABEL_SEP,
                 doctype, CF_LABEL_SEP,
                 pop, CF_LABEL_SEP,
                 psource, CF_LABEL_SEP,
                 map_name);

        vbuf = kcdbget(main_db, keys_to_use[i], strlen(keys_to_use[i]), &vsiz);
        if (vbuf) {
            msg_new[j] = cloudflare__zone_time_bucket__unpack(NULL, vsiz, (const uint8_t *)vbuf);
            if (msg_new[j] == NULL) {   // Something failed
                ereport(ERROR,
                        (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                         errmsg("error unpacking incoming message")));
            } else {
                if (msg_new[j]->kv_map_file) {
                    num_entries = MAX_KEYS_BEFORE_KV_MAP + 1;                    
                } else {
                    num_entries += msg_new[j]->n_map_entry; 
                }
                j++;
            }
            kcfree(vbuf);
        } else {
#ifdef CF_DUBUG
            ereport(NOTICE,
                    (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                     errmsg("get error on %s -- %s", keys_to_use[i], kcecodename(kcdbecode(main_db)))));
#endif
        }
    }
    
    // Now merge the buffers.
    KCDB* msg_db = NULL;
    if (num_entries > MAX_KEYS_BEFORE_KV_MAP) {
        msg_db = kcdbnew();
        set_kv_path(&msg, map_name, start_time, msg_db);
    }

    for (i = 0; i < j; i++) {
        if (num_entries > MAX_KEYS_BEFORE_KV_MAP) {
            num_new_keys += merge_using_kv_map(&msg, msg_new[i], msg_db);
        } else {
            num_new_keys += merge_messages_basic(&msg, msg_new[i]);
        }
        cloudflare__zone_time_bucket__free_unpacked(msg_new[i], NULL);
    }

    if (num_entries > MAX_KEYS_BEFORE_KV_MAP) {
        // Close the db.
        kcdbendtran (msg_db, 1);
        kcdbclose(msg_db);
    }

#ifdef CF_DUBUG
    ereport(NOTICE,
            (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
             errmsg("saving: num map entries: %zu -- writting with %d keys", msg.n_map_entry, num_new_keys)));
#endif

    // Save the updated buffer.
    if (num_new_keys > 0) {
        unsigned int len;
        void *buf;
        len = cloudflare__zone_time_bucket__get_packed_size (&msg);  
        buf = palloc (len);           
        cloudflare__zone_time_bucket__pack (&msg, buf);
        if(!kcdbset(main_db, msg.db_key, strlen(msg.db_key), buf, len)) {
            ereport(ERROR,
                    (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
                     errmsg("set error: %s\n", kcecodename(kcdbecode(main_db)))));
        }
        pfree (buf);
    }

    // Done!
    kcdbendtran (main_db, 1);
    if (!kcdbclose(main_db)) {
        ereport(ERROR,
                (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
                 errmsg("Error Closeing db: \"%s\"", kcecodename(kcdbecode(main_db)))));
    }

    tout = cstring_to_text(new_rid);
    PG_RETURN_TEXT_P(tout);
}


// Delete all entries in the db for the given map and prefix + any rid passed in.

PG_FUNCTION_INFO_V1(kc_delete);

Datum kc_delete(PG_FUNCTION_ARGS) {
    
    char       *map_name = text_to_cstring(PG_GETARG_TEXT_PP(0));
    char       *start_time = text_to_cstring(PG_GETARG_TEXT_PP(1));
    ArrayType  *rids = PG_GETARG_ARRAYTYPE_P(2);
    int        i;
    Datum      *rid_datums;
    bool       *rid_nulls;
    int        rid_count;
    char       *next_rid;
    KCDB       *main_db;
    char       *vbuf;
    size_t      vsiz;
    int64_t     num_keys_to_run;
    int64_t     num_keys_deleted;
    char        **keys_to_use;
    Cloudflare__ZoneTimeBucket *msg_new;

    // Open our DB.
    main_db = kcdbnew();
    if (!open_db (main_db, map_name, start_time)) {
        PG_RETURN_INT64(0);
    }
    kcdbbegintran (main_db, 0);

    // Now run over the array.
    deconstruct_array(rids, TEXTOID, -1, false, 'i',
                      &rid_datums, &rid_nulls, &rid_count);
    if (ARR_HASNULL(rids)) {
        ereport(ERROR,
                (errcode(ERRCODE_ARRAY_ELEMENT_ERROR),
                 errmsg("cannot work with arrays containing NULLs")));
    }

    keys_to_use = (char **)palloc(KC_MAX_ENTRIES_PER_RID * sizeof(char));
    num_keys_deleted = 0;
    char prefixes_to_use[rid_count][KC_MAX_RID];

    for (i = 0; i < rid_count; i++) {
        next_rid = TextDatumGetCString(rid_datums[i]);
        snprintf(prefixes_to_use[i], KC_MAX_RID, "%s%s", next_rid, CF_LABEL_SEP);
        num_keys_to_run = kcdbmatchprefix (main_db,	prefixes_to_use[i], keys_to_use, KC_MAX_ENTRIES_PER_RID);
        if (num_keys_to_run != -1) {
            num_keys_deleted += num_keys_to_run;
            int next_key;
            for (next_key=0; next_key < num_keys_to_run; next_key++) {
                vbuf = kcdbget(main_db, keys_to_use[next_key], strlen(keys_to_use[next_key]), &vsiz);
                if (vbuf) {
                    msg_new = cloudflare__zone_time_bucket__unpack(NULL, vsiz, (const uint8_t *)vbuf);
                    if (msg_new == NULL) {   // Something failed
                        ereport(ERROR,
                                (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
                                 errmsg("error unpacking incoming message")));
                    } else {
                        if (msg_new->kv_map_file) {
                            unlink(msg_new->kv_map_file);
                        }
                        kcdbremove (main_db, keys_to_use[next_key], strlen(keys_to_use[next_key]));	
                    }
                    cloudflare__zone_time_bucket__free_unpacked(msg_new, NULL);
                    kcfree(vbuf);
                    kcfree(keys_to_use[next_key]);
                } else {
                    ereport(NOTICE,
                            (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
                             errmsg("get error on %s -- %s", keys_to_use[next_key], kcecodename(kcdbecode(main_db)))));
                }
            }
        } else {
            ereport(NOTICE,
                    (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                     errmsg("prefix error on %s -- %s", prefixes_to_use[i], kcecodename(kcdbecode(main_db)))));
        }
    }

    pfree(keys_to_use);
    
    // Done!
    kcdbendtran (main_db, 1);
    if (!kcdbclose(main_db)) {
        ereport(ERROR,
                (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
                 errmsg("Error Closeing db: \"%s\"", kcecodename(kcdbecode(main_db)))));
    }

    PG_RETURN_INT64(num_keys_deleted);
}