You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

57 KiB

None <html> <head> </head>

Benchmark: ClickHouse Vs. InfluxDB Vs. Postgresql Vs. Parquet


How to use:

  • Rename the file "properties-model.ini" to "properties.ini"
  • Fill with your own credentials

The proposal of this work is to compare the speed in read/writing a midle level of data ( a dataset with 9 columns and 50.000 lines) to four diferent databases:

  • ClickHouse
  • InfluxDB
  • Postgresql
  • Parquet (in a S3 Minio Storage)
    ToDo:
  • DuckDB with Polars
  • MongoDB
  • Kdb+

Deve-se relevar: é uma "cold-storage" ou "frezze-storage"?
influxdb: alta leitura e possui a vantagem da indexaçõa para vizualização de dados em gráficos.

notas:

  • comparar tamanho do csv com parquet

Imports

In [11]:
import configparser
import time
import timeit
from datetime import datetime

import duckdb
import influxdb_client
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

# import pymongo
from clickhouse_driver import Client
from dotenv import load_dotenv
from minio import Minio
from pymongo import MongoClient
from pytz import timezone
from sqlalchemy import create_engine

load_dotenv()
Out[11]:
False
In [12]:
# Variables
dbname = "EURUSDtest"
In [13]:
arq = configparser.RawConfigParser()
arq.read("properties.ini")
ClickHouseUser = arq.get("CLICKHOUSE", "user")
ClickHouseKey = arq.get("CLICKHOUSE", "key")
ClickHouseUrl = arq.get("CLICKHOUSE", "url")

InfluxDBUser = arq.get("INFLUXDB", "user")
InfluxDBKey = arq.get("INFLUXDB", "key")
InfluxDBUrl = arq.get("INFLUXDB", "url")
InfluxDBBucket = arq.get("INFLUXDB", "bucket")

PostgresqlUser = arq.get("POSTGRESQL", "user")
PostgresqlKey = arq.get("POSTGRESQL", "key")
PostgresqlUrl = arq.get("POSTGRESQL", "url")
PostgresqlDB = arq.get("POSTGRESQL", "database")

S3MinioUser = arq.get("S3MINIO", "user")
S3MinioKey = arq.get("S3MINIO", "key")
S3MinioUrl = arq.get("S3MINIO", "url")
S3MinioRegion = arq.get("S3MINIO", "region")

MongoUser = arq.get("MONGODB", "user")
MongoKey = arq.get("MONGODB", "key")
MongoUrl = arq.get("MONGODB", "url")
In [14]:
# %%time
# Load Dataset
df = pd.read_csv("out.csv", index_col=0)
df.head()
Out[14]:
id from at to open close min max volume
0 7730801 2023-01-02 15:58:45 1672675140000000000 2023-01-02 15:59:00 1.065995 1.066035 1.065930 1.066070 57
1 7730802 2023-01-02 15:59:00 1672675155000000000 2023-01-02 15:59:15 1.066055 1.066085 1.066005 1.066115 52
2 7730803 2023-01-02 15:59:15 1672675170000000000 2023-01-02 15:59:30 1.066080 1.066025 1.066025 1.066110 57
3 7730804 2023-01-02 15:59:30 1672675185000000000 2023-01-02 15:59:45 1.065980 1.065985 1.065885 1.066045 64
4 7730805 2023-01-02 15:59:45 1672675200000000000 2023-01-02 16:00:00 1.065975 1.066055 1.065830 1.066055 50
In [ ]:
df["from"] = pd.to_datetime(df["from"], unit="s")
df["to"] = pd.to_datetime(df["to"], unit="s")
# Optional use when not transoformed yet
# Transform Datetime

Funçoes

-> Class

In [ ]:
def timestamp2dataHora(x, timezone_="America/Sao_Paulo"):
    d = datetime.fromtimestamp(x, tz=timezone(timezone_))
    return d

ClickHouse

In [15]:
# !! driver tcp.
def cHouseConnect():
    client = Client(
        host=ClickHouseUrl,
        user=ClickHouseUser,
        password=ClickHouseKey,
        settings={"use_numpy": True},
    )
    return client


# Create Tables in ClickHouse
# !! ALTERAR TIPOS !!
# ENGINE: 'Memory' desaparece quando server é reiniciado
def cHouseCreateDb(databasename):
    client = cHouseConnect()
    client.execute(
        "CREATE TABLE IF NOT EXISTS {} (id UInt32,"
        "from DateTime, at UInt64, to DateTime, open Float64,"
        "close Float64, min Float64, max  Float64, volume UInt32)"
        "ENGINE MergeTree ORDER BY to".format(databasename)
    )
    client.disconnect()
    return "Database created"


# Write dataframe to db
def cHouseInsertDf(dbName, dataframe):
    client = cHouseConnect()
    client.insert_dataframe("INSERT INTO {} VALUES".format(dbName), dataframe)
    client.disconnect()
    return " dataframe {} inserted in clickhouse database".format(dataframe)


def cHouseQueryDf(databaseName):
    client = cHouseConnect()
    dfQuery = client.query_dataframe(
        "SELECT * FROM default.{}".format(databaseName)
    )  # LIMIT 10000
    client.disconnect()
    return dfQuery


cHouseCreateDb(dbname)
Failed to connect to 192.168.1.142:9000
Traceback (most recent call last):
  File "/home/sandman/dev/pipenv/lib/python3.10/site-packages/clickhouse_driver/connection.py", line 395, in connect
    return self._init_connection(host, port)
  File "/home/sandman/dev/pipenv/lib/python3.10/site-packages/clickhouse_driver/connection.py", line 325, in _init_connection
    self.socket = self._create_socket(host, port)
  File "/home/sandman/dev/pipenv/lib/python3.10/site-packages/clickhouse_driver/connection.py", line 297, in _create_socket
    raise err
  File "/home/sandman/dev/pipenv/lib/python3.10/site-packages/clickhouse_driver/connection.py", line 288, in _create_socket
    sock.connect(sa)
TimeoutError: timed out
---------------------------------------------------------------------------
SocketTimeoutError                        Traceback (most recent call last)
Cell In[15], line 44
     40     client.disconnect()
     41     return dfQuery
---> 44 cHouseCreateDb(dbname)

Cell In[15], line 17, in cHouseCreateDb(databasename)
     15 def cHouseCreateDb(databasename):
     16     client = cHouseConnect()
---> 17     client.execute(
     18         "CREATE TABLE IF NOT EXISTS {} (id UInt32,"
     19         "from DateTime, at UInt64, to DateTime, open Float64,"
     20         "close Float64, min Float64, max  Float64, volume UInt32)"
     21         "ENGINE MergeTree ORDER BY to".format(databasename)
     22     )
     23     client.disconnect()
     24     return "Database created"

File ~/dev/pipenv/lib/python3.10/site-packages/clickhouse_driver/client.py:361, in Client.execute(self, query, params, with_column_types, external_tables, query_id, settings, types_check, columnar)
    318 """
    319 Executes query.
    320 
   (...)
    356               and types.
    357 """
    359 start_time = time()
--> 361 with self.disconnect_on_error(query, settings):
    362     # INSERT queries can use list/tuple/generator of list/tuples/dicts.
    363     # For SELECT parameters can be passed in only in dict right now.
    364     is_insert = isinstance(params, (list, tuple, types.GeneratorType))
    366     if is_insert:

File ~/dev/anaconda3/lib/python3.10/contextlib.py:135, in _GeneratorContextManager.__enter__(self)
    133 del self.args, self.kwds, self.func
    134 try:
--> 135     return next(self.gen)
    136 except StopIteration:
    137     raise RuntimeError("generator didn't yield") from None

File ~/dev/pipenv/lib/python3.10/site-packages/clickhouse_driver/client.py:305, in Client.disconnect_on_error(self, query, settings)
    302 @contextmanager
    303 def disconnect_on_error(self, query, settings):
    304     try:
--> 305         self.establish_connection(settings)
    307         yield
    309         self.track_current_database(query)

File ~/dev/pipenv/lib/python3.10/site-packages/clickhouse_driver/client.py:292, in Client.establish_connection(self, settings)
    290     self.connection = self.get_connection()
    291     self.make_query_settings(settings)
--> 292     self.connection.force_connect()
    293     self.last_query = QueryInfo()
    295 except (errors.SocketTimeoutError, errors.NetworkError):

File ~/dev/pipenv/lib/python3.10/site-packages/clickhouse_driver/connection.py:254, in Connection.force_connect(self)
    251 self.check_query_execution()
    253 if not self.connected:
--> 254     self.connect()
    256 elif not self.ping():
    257     logger.warning('Connection was closed, reconnecting.')

File ~/dev/pipenv/lib/python3.10/site-packages/clickhouse_driver/connection.py:416, in Connection.connect(self)
    413     self.hosts.rotate(-1)
    415 if err is not None:
--> 416     raise err

SocketTimeoutError: Code: 209. (192.168.1.142:9000)
In [47]:
# Insert to db and benchmark time
start = timeit.default_timer()
cHouseInsertDf(dbname, df)
stop = timeit.default_timer()
cHouse_write_execution_time = stop - start
In [48]:
# read from db and benchmark time
start = timeit.default_timer()
dfCh = cHouseQueryDf(dbname)
stop = timeit.default_timer()
cHouse_read_execution_time = stop - start
In [49]:
dfCh.head()
Out[49]:
id from at to open close min max volume
0 7730801 2023-01-02 15:58:45 1672675140000000000 2023-01-02 15:59:00 1.065995 1.066035 1.065930 1.066070 57
1 7730801 2023-01-02 15:58:45 1672675140000000000 2023-01-02 15:59:00 1.065995 1.066035 1.065930 1.066070 57
2 7730802 2023-01-02 15:59:00 1672675155000000000 2023-01-02 15:59:15 1.066055 1.066085 1.066005 1.066115 52
3 7730802 2023-01-02 15:59:00 1672675155000000000 2023-01-02 15:59:15 1.066055 1.066085 1.066005 1.066115 52
4 7730803 2023-01-02 15:59:15 1672675170000000000 2023-01-02 15:59:30 1.066080 1.066025 1.066025 1.066110 57
In [50]:
print(cHouse_read_execution_time)
5.175396532999002
In [51]:
print(cHouse_write_execution_time)
6.163630739996734
In [52]:
%%time
dfCh = cHouseQueryDf(dbname)
CPU times: user 1.15 s, sys: 216 ms, total: 1.36 s
Wall time: 5.24 s

InfluxDB

In [ ]:
client = influxdb_client.InfluxDBClient(
    url=InfluxDBUrl, token=InfluxDBKey, org=InfluxDBUser
)
In [ ]:
# Read data from CSV without index and parse 'TimeStamp' as date.
df = pd.read_csv("out.csv", sep=",", index_col=False, parse_dates=["from"])
# Set 'TimeStamp' field as index of dataframe # test another indexs
df.set_index("from", inplace=True)
In [ ]:
df.head()
In [ ]:
%%time
# gravando... demorou... mas deu certo
with client.write_api() as writer:
    writer.write(
        bucket=InfluxDBBucket,
        record=df,
        data_frame_measurement_name="id",
        data_frame_tag_columns=["volume"],
    )
In [ ]:
# data
#   |> pivot(
#     rowKey:["_time"],
#     columnKey: ["_field"],
#     valueColumn: "_value"
#   )
In [ ]:
# Read

Postgresql

In [23]:
# Connect / Create Tables
def psqlConnect():
    engine = create_engine(
        "postgresql+psycopg2://{}:{}@{}:5432/{}".format(
            PostgresqlUser, PostgresqlKey, PostgresqlUrl, PostgresqlDB
        )
    )
    return engine


psqlConnect()
# testar função
  Cell In[23], line 4
    "postgresql+psycopg2://{}:{}@{}:5432/{}".format(
    ^
SyntaxError: invalid syntax. Perhaps you forgot a comma?
In [ ]:

In [26]:
# Drop old table and create new empty table
def psqlCreateTables(databaseName):
    engine = psqlConnect()
    df.head(0).to_sql(databaseName, engine, if_exists="replace", index=False)
    # disconnect()
    return 0


psqlCreateTables(dbname)
# testar função
In [27]:
%%time
# Write
conn = engine.raw_connection()
cur = conn.cursor()
output = io.StringIO()
df.to_csv(output, sep="\t", header=False, index=False)
output.seek(0)
contents = output.getvalue()

cur.copy_from(output, "comparedbs")  # , null="")  # null values become ''
conn.commit()
cur.close()
conn.close()
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
File <timed exec>:2

AttributeError: 'int' object has no attribute 'raw_connection'
In [ ]:

In [ ]:
start = time.time()
# %%time
# Read
df = pd.read_sql_query('select * from "comparedbs"', con=engine)
end = time.time()
postgresql_read_time = exec_time(start, end)
In [ ]:
print(postgresql_read_time)
In [ ]:
df.head()

S3 Parquet

In [ ]:
# fazer sem funçao para ver se melhora
# verifique se esta no ssd os arquivos da pasta git
def main():
    client = Minio(
        S3MinioUrl,
        secure=False,
        region=S3MinioRegion,
        access_key="MatMPA7NyHltz7DQ",
        secret_key="SO1IG5iBPSjNPZanYUaHCLcoSbjphLCP",
    )

    # Make bucket if not exist.
    found = client.bucket_exists("data")
    if not found:
        client.make_bucket("data")
    else:
        print("Bucket 'data' already exists")

    # Upload
    client.fput_object(
        "data",
        "data.parquet",
        "data/data.parquet",
    )
    # print(
    #     "'data/data.parquet' is successfully uploaded as "
    #     "object 'data.parquet' to bucket 'data'."
    # )
In [ ]:
%%time
df.to_parquet("data/data.parquet")
if __name__ == "__main__":
    try:
        main()
    except S3Error as exc:
        print("error occurred.", exc)
In [ ]:
pq = pd.read_parquet("data/data.parquet", engine="pyarrow")
pq.head()

MongoDB

In [ ]:
# Load csv dataset
data = pd.read_csv("out.csv")
In [ ]:
# Connect to MongoDB
client = MongoClient(
    # "mongodb://192.168.1.133:27017"
    "mongodb://{}:{}@{}/EURUSDtest?retryWrites=true&w=majority".format(
        MongoUser, MongoKey, MongoUrl
    ),
    authSource="admin",
)
In [ ]:
db = client["EUROUSDtest"]
collection = db["finance"]
# data.reset_index(inplace=True)
data_dict = data.to_dict("records")
In [ ]:
%%time
# Insert collection
collection.insert_many(data_dict)
In [ ]:
# read

DuckDB

In [ ]:
cursor = duckdb.connect()
print(cursor.execute("SELECT 42").fetchall())
In [ ]:
%%time
conn = duckdb.connect()
data = pd.read_csv("out.csv")
conn.register("EURUSDtest", data)
In [ ]:
display(conn.execute("SHOW TABLES").df())
In [ ]:
%%time
df = conn.execute("SELECT * FROM EURUSDtest").df()
df

Kdb+

In [ ]:
import numpy as np

np.bool = np.bool_
from qpython import qconnection
In [ ]:
# read csv
data = pd.read_csv("out.csv")
In [ ]:
# open connection
q = qconnection.QConnection(host="localhost", port=5001)
q.open()
In [ ]:
%%time
# send df to kd+ in memory bank
q.sendSync("{t::x}", data)
In [ ]:
# write to on disk table
q.sendSync("`:/home/sandman/q/tab1 set t")
In [ ]:
%%time
# read from on disk table
df2 = q.sendSync("tab2: get `:/home/sandman/q/tab1")
In [ ]:
# print(df2)
In [ ]:
%%time
# load to variable df2
df2 = q.sendSync("tab2")
In [ ]:
# df2(type)
In [ ]:
%%time
# converto to dataframe
df = pd.DataFrame(q("t"))  # , pandas=True))
df.head()
In [ ]:
%%time
# select
df3 = q.sendSync("select from t")
In [ ]:
q.close()

Graph

In [66]:
x = np.arange(1)
width = 0.40
y1 = [cHouse_read_execution_time]
y2 = [cHouse_write_execution_time]
plt.bar(x - 0.2, y1, width)
plt.bar(x + 0.2, y2, width)
plt.xticks(x, ["Click House"])
plt.xlabel("Databases")
plt.ylabel("Seconds")
plt.legend(["Read", "Write"])  # ver
plt.show()
In [ ]:

</html>