You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

79 KiB

None <html> <head> </head>

Benchmark: ClickHouse Vs. InfluxDB Vs. Postgresql Vs. Parquet


How to use:

  • Rename the file "properties-model.ini" to "properties.ini"
  • Fill with your own credentials

The proposal of this work is to compare the speed in read/writing a midle level of data ( a dataset with 9 columns and 50.000 lines) to four diferent databases:

  • ClickHouse
  • InfluxDB
  • Postgresql
  • Parquet (in a S3 Minio Storage)
    ToDo:
  • DuckDB with Polars
  • MongoDB
  • Kdb+

Deve-se relevar: é uma "cold-storage" ou "frezze-storage"?
influxdb: alta leitura e possui a vantagem da indexaçõa para vizualização de dados em gráficos.

notas:

  • comparar tamanho do csv com parquet

Imports

In [11]:
import configparser
import io
import timeit
from datetime import datetime

import duckdb

# from influxdb_client import InfluxDBClient
import influxdb_client
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pdmongo as pdm
from clickhouse_driver import Client
from dotenv import load_dotenv

# from influxdb_client.client.write_api import SYNCHRONOUS
from minio import Minio
from pymongo import MongoClient
from pytz import timezone
from qpython import qconnection
from sqlalchemy import create_engine

load_dotenv()
Out[11]:
False
In [2]:
# Variables
dbname = "EURUSDtest"
collection = "finance"
In [3]:
arq = configparser.RawConfigParser()
arq.read("properties.ini")
ClickHouseUser = arq.get("CLICKHOUSE", "user")
ClickHouseKey = arq.get("CLICKHOUSE", "key")
ClickHouseUrl = arq.get("CLICKHOUSE", "url")

InfluxDBUser = arq.get("INFLUXDB", "user")
InfluxDBKey = arq.get("INFLUXDB", "key")
InfluxDBUrl = arq.get("INFLUXDB", "url")
InfluxDBBucket = arq.get("INFLUXDB", "bucket")

PostgresqlUser = arq.get("POSTGRESQL", "user")
PostgresqlKey = arq.get("POSTGRESQL", "key")
PostgresqlUrl = arq.get("POSTGRESQL", "url")
PostgresqlDB = arq.get("POSTGRESQL", "database")

S3MinioUser = arq.get("S3MINIO", "user")
S3MinioKey = arq.get("S3MINIO", "key")
S3MinioUrl = arq.get("S3MINIO", "url")
S3MinioRegion = arq.get("S3MINIO", "region")

MongoUser = arq.get("MONGODB", "user")
MongoKey = arq.get("MONGODB", "key")
MongoUrl = arq.get("MONGODB", "url")
In [4]:
# %%time
# Load Dataset
df = pd.read_csv("out.csv", index_col=0)
df.tail()
Out[4]:
id from at to open close min max volume
999995 7984748 2023-03-03 18:13:30 1677867225000000000 2023-03-03 18:13:45 1.062695 1.062635 1.062630 1.062700 64
999996 7984749 2023-03-03 18:13:45 1677867240000000000 2023-03-03 18:14:00 1.062645 1.062650 1.062625 1.062650 43
999997 7984750 2023-03-03 18:14:00 1677867255000000000 2023-03-03 18:14:15 1.062640 1.062625 1.062620 1.062665 47
999998 7984751 2023-03-03 18:14:15 1677867270000000000 2023-03-03 18:14:30 1.062625 1.062535 1.062535 1.062645 43
999999 7984752 2023-03-03 18:14:30 1677867285000000000 2023-03-03 18:14:45 1.062535 1.062520 1.062520 1.062580 59
In [ ]:
df["from"] = pd.to_datetime(df["from"], unit="s")
df["to"] = pd.to_datetime(df["to"], unit="s")
# Optional use when not transoformed yet
# Transform Datetime

Funçoes

-> Class

In [5]:
def timestamp2dataHora(x, timezone_="America/Sao_Paulo"):
    d = datetime.fromtimestamp(x, tz=timezone(timezone_))
    return d

ClickHouse

In [8]:
# !! driver tcp.
def cHouseConnect():
    client = Client(
        host=ClickHouseUrl,
        user=ClickHouseUser,
        password=ClickHouseKey,
        settings={"use_numpy": True},
    )
    return client


# Create Tables in ClickHouse
# !! ALTERAR TIPOS !!
# ENGINE: 'Memory' desaparece quando server é reiniciado
def cHouseCreateDb(databasename):
    client = cHouseConnect()
    client.execute(
        "CREATE TABLE IF NOT EXISTS {} (id UInt32,"
        "from DateTime, at UInt64, to DateTime, open Float64,"
        "close Float64, min Float64, max  Float64, volume UInt32)"
        "ENGINE MergeTree ORDER BY to".format(databasename)
    )
    client.disconnect()
    return "Database created"


# Write dataframe to db
def cHouseInsertDf(dbName, dataframe):
    client = cHouseConnect()
    client.insert_dataframe("INSERT INTO {} VALUES".format(dbName), dataframe)
    client.disconnect()
    return " dataframe {} inserted in clickhouse database".format(dataframe)


def cHouseQueryDf(databaseName):
    client = cHouseConnect()
    dfQuery = client.query_dataframe("SELECT * FROM default.{}".format(databaseName))
    client.disconnect()
    return dfQuery


cHouseCreateDb(dbname)
Out[8]:
'Database created'
In [9]:
# Insert to db and benchmark time
start = timeit.default_timer()
cHouseInsertDf(dbname, df)
stop = timeit.default_timer()
cHouse_write_execution_time = stop - start
In [10]:
# read from db and benchmark time
start = timeit.default_timer()
dfCh = cHouseQueryDf(dbname)
stop = timeit.default_timer()
cHouse_read_execution_time = stop - start
In [11]:
dfCh.tail()
Out[11]:
id from at to open close min max volume
999995 7857774 2023-02-01 17:06:00 1675271175000000000 2023-02-01 17:06:15 1.091725 1.091670 1.09166 1.09176 84
999996 7857775 2023-02-01 17:06:15 1675271190000000000 2023-02-01 17:06:30 1.091680 1.091660 1.09165 1.09168 51
999997 7857775 2023-02-01 17:06:15 1675271190000000000 2023-02-01 17:06:30 1.091680 1.091660 1.09165 1.09168 51
999998 7857776 2023-02-01 17:06:30 1675271205000000000 2023-02-01 17:06:45 1.091660 1.091655 1.09164 1.09168 63
999999 7857776 2023-02-01 17:06:30 1675271205000000000 2023-02-01 17:06:45 1.091660 1.091655 1.09164 1.09168 63
In [12]:
print(cHouse_read_execution_time)
5.7032926019999195
In [13]:
print(cHouse_write_execution_time)
6.242225094000105
In [28]:
# %%time
# dfCh = cHouseQueryDf(dbname)
CPU times: user 2.21 s, sys: 383 ms, total: 2.6 s
Wall time: 10.7 s

InfluxDB

In [33]:
query = """
from(bucket: "EURUSDtest")
|> range(start:2023-03-03T18:14:30Z, stop: now())
|> filter(fn: (r) => r._measurement == "id")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")"""
In [34]:
def influxdbConnect():
    client = influxdb_client.InfluxDBClient(
        url=InfluxDBUrl, token=InfluxDBKey, org=InfluxDBUser
    )
    return client


def influxdbLoadCsv(csv="out.csv", dictDates=["from", "to"], index="from"):
    # Read data from CSV without index and parse 'TimeStamp' as date.
    df = pd.read_csv(csv, sep=",", index_col=False, parse_dates=dictDates)
    # Set 'TimeStamp' field as index of dataframe # test another indexs
    df.set_index(index, inplace=True)
    return df


def influxdbWriteCsv(dataFrame, bucket, measurement="id", tag="volume"):
    client = influxdbConnect()
    # write_options=SYNCHRONOUS
    with client.write_api() as writer:
        writer.write(
            bucket=bucket,
            record=dataFrame,
            data_frame_measurement_name=measurement,
            data_frame_tag_columns=[tag],
        )
    writer.__del__()
    client.__del__()
    return 0


def influxdRead(org=InfluxDBUrl, query=query):
    client = influxdbConnect()
    InfluxDf = client.query_api().query_data_frame(org="librography", query=query)
    # display(InfluxDf.head())
    return InfluxDf
In [96]:
dafr = influxdbLoadCsv()
# dafr.head()
In [ ]:
start = timeit.default_timer()
influxdbWriteCsv(dafr, InfluxDBBucket)
# dfIdw = cHouseQueryDf(dbname)
stop = timeit.default_timer()
influxdb_write_execution_time = stop - start
In [119]:
print(influxdb_write_execution_time)
161.51344409900048
In [120]:
# read from db and benchmark time
start = timeit.default_timer()
influxdRead()
stop = timeit.default_timer()
influxdb_read_execution_time = stop - start
result table _start _stop _time _measurement volume Unnamed: 0 at close id max min open to
0 _result 0 2023-03-03 18:14:30+00:00 2023-06-17 02:47:50.721233+00:00 2023-03-05 22:01:00+00:00 id 0 115589 1678053675000000000 1.063425 7985654 1.063425 1.063425 1.063425 2023-03-05 22:01:15
1 _result 0 2023-03-03 18:14:30+00:00 2023-06-17 02:47:50.721233+00:00 2023-03-05 23:58:30+00:00 id 0 116059 1678060725000000000 1.062595 7986124 1.062595 1.062595 1.062595 2023-03-05 23:58:45
2 _result 0 2023-03-03 18:14:30+00:00 2023-06-17 02:47:50.721233+00:00 2023-03-06 23:58:30+00:00 id 0 121819 1678147125000000000 1.068615 7991884 1.068615 1.068615 1.068615 2023-03-06 23:58:45
3 _result 0 2023-03-03 18:14:30+00:00 2023-06-17 02:47:50.721233+00:00 2023-03-06 23:59:30+00:00 id 0 121823 1678147185000000000 1.068605 7991888 1.068605 1.068605 1.068605 2023-03-06 23:59:45
4 _result 0 2023-03-03 18:14:30+00:00 2023-06-17 02:47:50.721233+00:00 2023-03-08 23:59:00+00:00 id 0 182493 1678319955000000000 1.054895 8003406 1.054895 1.054895 1.054895 2023-03-08 23:59:15
In [121]:
print(influxdb_read_execution_time)
181.9998163249984
In [19]:
df.tail()
Out[19]:
Unnamed: 0 id at to open close min max volume
from
2023-03-03 18:13:30 999995 7984748 1677867225000000000 2023-03-03 18:13:45 1.062695 1.062635 1.062630 1.062700 64
2023-03-03 18:13:45 999996 7984749 1677867240000000000 2023-03-03 18:14:00 1.062645 1.062650 1.062625 1.062650 43
2023-03-03 18:14:00 999997 7984750 1677867255000000000 2023-03-03 18:14:15 1.062640 1.062625 1.062620 1.062665 47
2023-03-03 18:14:15 999998 7984751 1677867270000000000 2023-03-03 18:14:30 1.062625 1.062535 1.062535 1.062645 43
2023-03-03 18:14:30 999999 7984752 1677867285000000000 2023-03-03 18:14:45 1.062535 1.062520 1.062520 1.062580 59

Postgresql

In [24]:
# Connect / Create Tables
def psqlConnect():
    engine = create_engine(
        "postgresql+psycopg2://{}:{}@{}:5432/{}".format(
            PostgresqlUser, PostgresqlKey, PostgresqlUrl, PostgresqlDB
        )
    )
    return engine


psqlConnect()
# testar função
Out[24]:
Engine(postgresql+psycopg2://postgres:***@192.168.1.133:5432/postgres)
In [25]:
# Drop old table and create new empty table
def psqlCreateTables(databaseName):
    engine = psqlConnect()
    df.head(0).to_sql(databaseName, engine, if_exists="replace", index=False)
    # Write
    conn = engine.raw_connection()
    cur = conn.cursor()
    output = io.StringIO()
    df.to_csv(output, sep="\t", header=False, index=False)
    output.seek(0)
    contents = output.getvalue()

    cur.copy_from(output, "comparedbs")  # , null="")  # null values become ''
    conn.commit()
    cur.close()
    conn.close()
    # disconnect()
    return 0


# funcao read sql
def psqlReadTables():
    engine = psqlConnect()
    df = pd.read_sql_query('select * from "comparedbs"', con=engine)
    return df


# testar função
In [26]:
# Insert to db and benchmark time
start = timeit.default_timer()
psqlCreateTables(dbname)
stop = timeit.default_timer()
psql_write_execution_time = stop - start
In [27]:
start = timeit.default_timer()
psqlReadTables()
stop = timeit.default_timer()
psql_read_execution_time = stop - start
In [29]:
print(psql_read_execution_time)
126.40752380799677
In [ ]:
# df.head()

S3 Parquet

In [15]:
# fazer sem funçao para ver se melhora
# verifique se esta no ssd os arquivos da pasta git
def s3Connect():
    client = Minio(
        S3MinioUrl,
        secure=False,
        region=S3MinioRegion,
        access_key=S3MinioUser,
        secret_key=S3MinioKey,
    )
    return client


def s3CreateBucket(bucketName="data"):
    client = s3Connect()
    found = client.bucket_exists(bucketName)
    if not found:
        return client.make_bucket(bucketName)
    else:
        return "Bucket '{}' already exists".format(bucketName)


def s3uploadCsv():
    client = s3Connect()
    client.fput_object(
        "data",
        "data.parquet",
        "data/data.parquet",
    )
    return (
        "'data/data.parquet' is successfully uploaded as "
        "object 'data.parquet' to bucket 'data'."
    )
In [13]:
# Insert to db and benchmark time
df.to_parquet("data/data.parquet")
s3CreateBucket()
start = timeit.default_timer()
s3uploadCsv()
stop = timeit.default_timer()
s3_write_execution_time = stop - start
In [ ]:
# falta read (parquet to df)
In [14]:
print(s3_write_execution_time)
3.8389689489995362
In [16]:
start = timeit.default_timer()
pq = pd.read_parquet("data/data.parquet", engine="pyarrow")
stop = timeit.default_timer()
s3_read_execution_time = stop - start
In [17]:
pq.head()
Out[17]:
id from at to open close min max volume
0 7730801 2023-01-02 15:58:45 1672675140000000000 2023-01-02 15:59:00 1.065995 1.066035 1.065930 1.066070 57
1 7730802 2023-01-02 15:59:00 1672675155000000000 2023-01-02 15:59:15 1.066055 1.066085 1.066005 1.066115 52
2 7730803 2023-01-02 15:59:15 1672675170000000000 2023-01-02 15:59:30 1.066080 1.066025 1.066025 1.066110 57
3 7730804 2023-01-02 15:59:30 1672675185000000000 2023-01-02 15:59:45 1.065980 1.065985 1.065885 1.066045 64
4 7730805 2023-01-02 15:59:45 1672675200000000000 2023-01-02 16:00:00 1.065975 1.066055 1.065830 1.066055 50
In [18]:
print(s3_read_execution_time)
0.5066086639999412

MongoDB

In [71]:
port = "27017"


def mongoLoadCsv(csvfile):
    data = pd.read_csv("out.csv")
    return data


def mongoConnect():
    client = MongoClient(
        "mongodb://{}:{}@{}/EURUSDtest?retryWrites=true&w=majority".format(
            MongoUser, MongoKey, MongoUrl
        ),
        authSource="admin",
    )
    return client


def mongoWriteDict(dados, dbs, collection):
    client = mongoConnect()
    db = client[dbs]
    collection = db[collection]
    # data.reset_index(inplace=True)
    data_dict = dados.to_dict("records")
    # return data_dict
    collection.insert_many(data_dict)
    return 0


def mongoRead():
    df = pdm.read_mongo(
        collection, [], "mongodb://192.168.1.162:27017/{}".format(dbname)
    )
    return df
In [ ]:
data = mongoLoadCsv("out.csv")
data.head()
In [51]:
start = timeit.default_timer()
dfCh = mongoWriteDict(data, dbname, "finance")
stop = timeit.default_timer()
mongo_write_execution_time = stop - start
In [52]:
print(mongo_write_execution_time)
46.76343438199547
In [72]:
start = timeit.default_timer()
dfMongo = mongoRead()
stop = timeit.default_timer()
mongo_read_execution_time = stop - start
In [73]:
print(mongo_read_execution_time)
56.66832709600567

DuckDB

In [85]:
# https://duckdb.org/2022/07/27/art-storage.html


def duckdbConnect():
    cursor = duckdb.connect()
    return cursor


def duckdbLoadCsv(csvFile="out.csv"):
    data = pd.read_csv("out.csv")
    return data


# write
def duckdbWrite(file, dataframe, tableName):
    conn = duckdbConnect()
    conn = duckdb.connect(file)
    # conn.execute("DROP TABLE EURUSDtest")
    # conn.sql('CREATE TABLE EURUSDtest(i INTEGER)')
    conn.register("tempTable", dataframe)
    conn.execute("CREATE TABLE {} AS SELECT * FROM tempTable".format(tableName))
    conn.close()
    return 0


def duckdbRead(dbfile):
    conn = duckdbConnect()
    conn = duckdb.connect(dbfile)
    conn.execute("SELECT * FROM {}".format(dbname)).fetchdf()
    conn.close()
    return 0
In [73]:
data = duckdbLoadCsv()
In [ ]:
conn = duckdbConnect()
conn.execute("DROP TABLE EURUSD")
In [74]:
conn.register("EURUSDtest", data)
Out[74]:
<duckdb.DuckDBPyConnection at 0x7f925fb29b30>
In [84]:
conn = duckdb.connect("file.db")
# conn.execute("DROP TABLE EURUSDtest")
# conn.sql("CREATE TABLE EURUSDtest(i INTEGER)")
display(conn.execute("SHOW TABLES").df())
name
0 EURUSDtest
In [ ]:
# conn = duckdbConnect()
conn.execute("SELECT * FROM EURUSDtest")
conn.fetchall()
In [82]:
start = timeit.default_timer()
dfDuckdb = duckdbWrite("file.db", data, dbname)
stop = timeit.default_timer()
duckdb_write_execution_time = stop - start
In [87]:
print(duckdb_write_execution_time)
1.7639581979965442
In [86]:
start = timeit.default_timer()
dfDuckdb = duckdbRead("file.db")
stop = timeit.default_timer()
duckdb_read_execution_time = stop - start
In [88]:
print(duckdb_read_execution_time)
0.5125257010004134

Kdb+

In [31]:
# numpy version boolean fix
np.bool = np.bool_
In [25]:
# functions
# cd ~ && q/l64/q -p 5001


def kdbConnect():
    q = qconnection.QConnection(host="localhost", port=5001)
    q.open()
    return q


def kdbLoadCsv(file="out.csv"):
    data = pd.read_csv(file)
    return data


def kdbWrite():
    q = kdbConnect()
    data = kdbLoadCsv()
    q.sendSync("{t::x}", data)
    q.sendSync("`:/home/sandman/q/tab1 set t")
    q.close()
    return 0


def kdbRead():
    q = kdbConnect()
    # df2 = q.sendSync("tab2: get `:/home/sandman/q/tab1")
    # df2 = q.sendSync("tab2")
    df = pd.DataFrame(q("t"))  # , pandas=True))
    # df3 = q.sendSync("select from t")
    # ver todos esses loads
    q.close()
    return 0
In [28]:
# write
start = timeit.default_timer()
dfKdb = kdbWrite()
stop = timeit.default_timer()
kdb_write_execution_time = stop - start
In [29]:
# print write time
print(kdb_write_execution_time)
2.8739770100000896
In [30]:
# read
start = timeit.default_timer()
dfKdb = kdbRead()
stop = timeit.default_timer()
kdb_read_execution_time = stop - start
In [ ]:
# print read time
print(kdb_read_execution_time)
4.153738381999574
In [46]:
s = " sec"
data = [
    [
        "{:.2f}".format(kdb_write_execution_time) + s,
        "{:.2f}".format(kdb_read_execution_time) + s,
        "{:.2f}".format(kdb_write_execution_time + kdb_read_execution_time) + s,
    ],
    ["fill", 15, 0],
    ["fill", 14, 0],
]

index_labels=['Kdb+','r2','r3']
# Create the pandas DataFrame
df = pd.DataFrame(data, columns=["Write Time", "Read Time", "Total Time"],index=index_labels)

# print dataframe.
df
Out[46]:
Write Time Read Time Total Time
Kdb+ 2.87 sec 4.15 sec 7.03 sec
r2 fill 15 0
r3 fill 14 0

Graph

In [30]:
x = np.arange(3)  # change here
width = 0.40
y1 = [
    cHouse_read_execution_time,
    psql_read_execution_time,
    s3_read_execution_time,
]  # change here
y2 = [
    cHouse_write_execution_time,
    psql_write_execution_time,
    s3_write_execution_time,
]  # change here
plt.bar(x - 0.2, y1, width)
plt.bar(x + 0.2, y2, width)
plt.xticks(x, ["Click House", "Postgresql", "S3 Parquet"])
plt.xlabel("Databases")
plt.ylabel("Seconds")
plt.legend(["Read", "Write"])  # ver
plt.show()
In [ ]:

</html>