You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
65 KiB
65 KiB
None
<html>
<head>
</head>
</html>
Benchmark: ClickHouse Vs. InfluxDB Vs. Postgresql Vs. Parquet¶
How to use:¶
- Rename the file "properties-model.ini" to "properties.ini"
- Fill with your own credentials
The proposal of this work is to compare the speed in read/writing a midle level of data ( a dataset with 9 columns and 50.000 lines) to four diferent databases:
- ClickHouse
- InfluxDB
- Postgresql
- Parquet (in a S3 Minio Storage)
ToDo: - DuckDB with Polars
- MongoDB
- Kdb+
Deve-se relevar:
é uma "cold-storage" ou "frezze-storage"?
influxdb: alta leitura e possui a vantagem da indexaçõa para vizualização de dados em gráficos.
notas:
- comparar tamanho do csv com parquet
Imports¶
In [34]:
import configparser
import io
import timeit
from datetime import datetime
import duckdb
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from clickhouse_driver import Client
from dotenv import load_dotenv
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS
from minio import Minio
from pymongo import MongoClient
from pytz import timezone
from sqlalchemy import create_engine
load_dotenv()
Out[34]:
In [2]:
# Variables
dbname = "EURUSDtest"
In [94]:
arq = configparser.RawConfigParser()
arq.read("properties.ini")
ClickHouseUser = arq.get("CLICKHOUSE", "user")
ClickHouseKey = arq.get("CLICKHOUSE", "key")
ClickHouseUrl = arq.get("CLICKHOUSE", "url")
InfluxDBUser = arq.get("INFLUXDB", "user")
InfluxDBKey = arq.get("INFLUXDB", "key")
InfluxDBUrl = arq.get("INFLUXDB", "url")
InfluxDBBucket = arq.get("INFLUXDB", "bucket")
PostgresqlUser = arq.get("POSTGRESQL", "user")
PostgresqlKey = arq.get("POSTGRESQL", "key")
PostgresqlUrl = arq.get("POSTGRESQL", "url")
PostgresqlDB = arq.get("POSTGRESQL", "database")
S3MinioUser = arq.get("S3MINIO", "user")
S3MinioKey = arq.get("S3MINIO", "key")
S3MinioUrl = arq.get("S3MINIO", "url")
S3MinioRegion = arq.get("S3MINIO", "region")
MongoUser = arq.get("MONGODB", "user")
MongoKey = arq.get("MONGODB", "key")
MongoUrl = arq.get("MONGODB", "url")
In [4]:
# %%time
# Load Dataset
df = pd.read_csv("out.csv", index_col=0)
df.tail()
Out[4]:
In [ ]:
df["from"] = pd.to_datetime(df["from"], unit="s")
df["to"] = pd.to_datetime(df["to"], unit="s")
# Optional use when not transoformed yet
# Transform Datetime
Funçoes¶
-> Class
In [5]:
def timestamp2dataHora(x, timezone_="America/Sao_Paulo"):
d = datetime.fromtimestamp(x, tz=timezone(timezone_))
return d
ClickHouse¶
In [8]:
# !! driver tcp.
def cHouseConnect():
client = Client(
host=ClickHouseUrl,
user=ClickHouseUser,
password=ClickHouseKey,
settings={"use_numpy": True},
)
return client
# Create Tables in ClickHouse
# !! ALTERAR TIPOS !!
# ENGINE: 'Memory' desaparece quando server é reiniciado
def cHouseCreateDb(databasename):
client = cHouseConnect()
client.execute(
"CREATE TABLE IF NOT EXISTS {} (id UInt32,"
"from DateTime, at UInt64, to DateTime, open Float64,"
"close Float64, min Float64, max Float64, volume UInt32)"
"ENGINE MergeTree ORDER BY to".format(databasename)
)
client.disconnect()
return "Database created"
# Write dataframe to db
def cHouseInsertDf(dbName, dataframe):
client = cHouseConnect()
client.insert_dataframe("INSERT INTO {} VALUES".format(dbName), dataframe)
client.disconnect()
return " dataframe {} inserted in clickhouse database".format(dataframe)
def cHouseQueryDf(databaseName):
client = cHouseConnect()
dfQuery = client.query_dataframe(
"SELECT * FROM default.{}".format(databaseName)
) # LIMIT 10000
client.disconnect()
return dfQuery
cHouseCreateDb(dbname)
Out[8]:
In [9]:
# Insert to db and benchmark time
start = timeit.default_timer()
cHouseInsertDf(dbname, df)
stop = timeit.default_timer()
cHouse_write_execution_time = stop - start
In [10]:
# read from db and benchmark time
start = timeit.default_timer()
dfCh = cHouseQueryDf(dbname)
stop = timeit.default_timer()
cHouse_read_execution_time = stop - start
In [11]:
dfCh.tail()
Out[11]:
In [12]:
print(cHouse_read_execution_time)
In [13]:
print(cHouse_write_execution_time)
In [28]:
# %%time
# dfCh = cHouseQueryDf(dbname)
InfluxDB¶
In [95]:
def influxdbConnect():
client = influxdb_client.InfluxDBClient(
url=InfluxDBUrl, token=InfluxDBKey, org=InfluxDBUser
)
return client
def influxdbLoadCsv(csv="out.csv", dictDates=["from", "to"], index="from"):
# Read data from CSV without index and parse 'TimeStamp' as date.
df = pd.read_csv(csv, sep=",", index_col=False, parse_dates=dictDates)
# Set 'TimeStamp' field as index of dataframe # test another indexs
df.set_index(index, inplace=True)
return df
def influxdbWriteCsv(dataFrame, bucket, measurement="id", tag="volume"):
client = influxdbConnect()
# write_options=SYNCHRONOUS
with client.write_api() as writer:
writer.write(
bucket=bucket,
record=dataFrame,
data_frame_measurement_name=measurement,
data_frame_tag_columns=[tag],
)
writer.__del__()
client.__del__()
return 0
def influxdRead(org=InfluxDBUrl, query=query):
client = influxdbConnect()
InfluxDf = client.query_api().query_data_frame(org="librography", query=query)
# display(InfluxDf.head())
return InfluxDf
In [96]:
dafr = influxdbLoadCsv()
# dafr.head()
In [ ]:
start = timeit.default_timer()
influxdbWriteCsv(dafr, InfluxDBBucket)
# dfIdw = cHouseQueryDf(dbname)
stop = timeit.default_timer()
influxdb_write_execution_time = stop - start
In [119]:
print(influxdb_write_execution_time)
In [113]:
query = """
from(bucket: "EURUSDtest")
|> range(start:2023-03-03T18:14:30Z, stop: now())
|> filter(fn: (r) => r._measurement == "id")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")"""
# |> filter(fn: (r) => r._field == "volume")
# |> filter(fn: (r) => r.cpu == "cpu-total")
In [ ]:
# read from db and benchmark time
start = timeit.default_timer()
dfIdr = cHouseQueryDf(dbname)
stop = timeit.default_timer()
influxdb_read_execution_time = stop - start
In [118]:
print(influxdb_read_execution_time)
In [19]:
df.tail()
Out[19]:
Postgresql¶
In [24]:
# Connect / Create Tables
def psqlConnect():
engine = create_engine(
"postgresql+psycopg2://{}:{}@{}:5432/{}".format(
PostgresqlUser, PostgresqlKey, PostgresqlUrl, PostgresqlDB
)
)
return engine
psqlConnect()
# testar função
Out[24]:
In [25]:
# Drop old table and create new empty table
def psqlCreateTables(databaseName):
engine = psqlConnect()
df.head(0).to_sql(databaseName, engine, if_exists="replace", index=False)
# Write
conn = engine.raw_connection()
cur = conn.cursor()
output = io.StringIO()
df.to_csv(output, sep="\t", header=False, index=False)
output.seek(0)
contents = output.getvalue()
cur.copy_from(output, "comparedbs") # , null="") # null values become ''
conn.commit()
cur.close()
conn.close()
# disconnect()
return 0
# funcao read sql
def psqlReadTables():
engine = psqlConnect()
df = pd.read_sql_query('select * from "comparedbs"', con=engine)
return df
# testar função
In [26]:
# Insert to db and benchmark time
start = timeit.default_timer()
psqlCreateTables(dbname)
stop = timeit.default_timer()
psql_write_execution_time = stop - start
In [27]:
start = timeit.default_timer()
psqlReadTables()
stop = timeit.default_timer()
psql_read_execution_time = stop - start
In [29]:
print(psql_read_execution_time)
In [ ]:
# df.head()
S3 Parquet¶
In [15]:
# fazer sem funçao para ver se melhora
# verifique se esta no ssd os arquivos da pasta git
def s3Connect():
client = Minio(
S3MinioUrl,
secure=False,
region=S3MinioRegion,
access_key=S3MinioUser,
secret_key=S3MinioKey,
)
return client
def s3CreateBucket(bucketName="data"):
client = s3Connect()
found = client.bucket_exists(bucketName)
if not found:
return client.make_bucket(bucketName)
else:
return "Bucket '{}' already exists".format(bucketName)
def s3uploadCsv():
client = s3Connect()
client.fput_object(
"data",
"data.parquet",
"data/data.parquet",
)
return (
"'data/data.parquet' is successfully uploaded as "
"object 'data.parquet' to bucket 'data'."
)
In [13]:
# Insert to db and benchmark time
df.to_parquet("data/data.parquet")
s3CreateBucket()
start = timeit.default_timer()
s3uploadCsv()
stop = timeit.default_timer()
s3_write_execution_time = stop - start
In [ ]:
# falta read (parquet to df)
In [14]:
print(s3_write_execution_time)
In [16]:
start = timeit.default_timer()
pq = pd.read_parquet("data/data.parquet", engine="pyarrow")
stop = timeit.default_timer()
s3_read_execution_time = stop - start
In [17]:
pq.head()
Out[17]:
In [18]:
print(s3_read_execution_time)
MongoDB¶
In [ ]:
# Load csv dataset
data = pd.read_csv("out.csv")
In [ ]:
# Connect to MongoDB
client = MongoClient(
# "mongodb://192.168.1.133:27017"
"mongodb://{}:{}@{}/EURUSDtest?retryWrites=true&w=majority".format(
MongoUser, MongoKey, MongoUrl
),
authSource="admin",
)
In [ ]:
db = client["EUROUSDtest"]
collection = db["finance"]
# data.reset_index(inplace=True)
data_dict = data.to_dict("records")
In [ ]:
%%time
# Insert collection
collection.insert_many(data_dict)
In [ ]:
# read
DuckDB¶
In [ ]:
cursor = duckdb.connect()
print(cursor.execute("SELECT 42").fetchall())
In [ ]:
%%time
conn = duckdb.connect()
data = pd.read_csv("out.csv")
conn.register("EURUSDtest", data)
In [ ]:
display(conn.execute("SHOW TABLES").df())
In [ ]:
%%time
df = conn.execute("SELECT * FROM EURUSDtest").df()
df
Kdb+¶
In [31]:
np.bool = np.bool_
from qpython import qconnection
In [32]:
# read csv
data = pd.read_csv("out.csv")
In [ ]:
# open connection
q = qconnection.QConnection(host="localhost", port=5001)
q.open()
In [ ]:
%%time
# send df to kbd+ in memory bank
q.sendSync("{t::x}", data)
In [ ]:
# write to on disk table
q.sendSync("`:/home/sandman/q/tab1 set t")
In [ ]:
%%time
# read from on disk table
df2 = q.sendSync("tab2: get `:/home/sandman/q/tab1")
In [ ]:
# print(df2)
In [ ]:
%%time
# load to variable df2
df2 = q.sendSync("tab2")
In [ ]:
# df2(type)
In [ ]:
%%time
# converto to dataframe
df = pd.DataFrame(q("t")) # , pandas=True))
df.head()
In [ ]:
%%time
# select
df3 = q.sendSync("select from t")
In [ ]:
q.close()
Graph¶
In [30]:
x = np.arange(3) # change here
width = 0.40
y1 = [
cHouse_read_execution_time,
psql_read_execution_time,
s3_read_execution_time,
] # change here
y2 = [
cHouse_write_execution_time,
psql_write_execution_time,
s3_write_execution_time,
] # change here
plt.bar(x - 0.2, y1, width)
plt.bar(x + 0.2, y2, width)
plt.xticks(x, ["Click House", "Postgresql", "S3 Parquet"])
plt.xlabel("Databases")
plt.ylabel("Seconds")
plt.legend(["Read", "Write"]) # ver
plt.show()
In [ ]: