You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
24 KiB
24 KiB
None
<html>
<head>
</head>
</html>
Benchmark: ClickHouse Vs. InfluxDB Vs. Postgresql Vs. Parquet¶
How to use:¶
- Rename the file "properties-model.ini" to "properties.ini"
- Fill with your own credentials
The proposal of this work is to compare the speed in read/writing a midle level of data ( a dataset with 9 columns and 50.000 lines) to four diferent databases:
- ClickHouse
- InfluxDB
- Postgresql
- Parquet (in a S3 Minio Storage)
- DuckDB with Polars
- MongoDB
- Kdb+
- Clickhouse read
Deve-se relevar: é uma "cold-storage" ou "frezze-storage" influxdb: alta leitura etem a vantagem da indexaçõa para viizualização de dados em gráficos
notas:
- comparar tamanho do csv com parquet
Imports¶
In [74]:
import configparser
from datetime import datetime
import influxdb_client
import pandas as pd
from clickhouse_driver import Client
from dotenv import load_dotenv
from minio import Minio
from pymongo import MongoClient
from pytz import timezone
from sqlalchemy import create_engine
load_dotenv()
# import io
# import time
# import numpy as np
# import clickhouse_connect
# pip install python-dotenv
# import psycopg2
# import os
# import pyarrow as pa
# import pyarrow.parquet as pq
# import s3fs
# from friendly.jupyter import Friendly
# from minio.error import S3Error
# from pyarrow import Table
# import os
# from influxdb_client import InfluxDBClient, Point, WritePrecision
# from influxdb_client.client.write_api import SYNCHRONOUS
# Friendly.dark()
Out[74]:
In [ ]:
# teset
In [ ]:
# Variables
dbname = "EURUSDtest"
In [ ]:
arq = configparser.RawConfigParser()
arq.read("properties.ini")
ClickHouseUser = arq.get("CLICKHOUSE", "user")
ClickHouseKey = arq.get("CLICKHOUSE", "key")
ClickHouseUrl = arq.get("CLICKHOUSE", "url")
InfluxDBUser = arq.get("INFLUXDB", "user")
InfluxDBKey = arq.get("INFLUXDB", "key")
InfluxDBUrl = arq.get("INFLUXDB", "url")
InfluxDBBucket = arq.get("INFLUXDB", "bucket")
PostgresqlUser = arq.get("POSTGRESQL", "user")
PostgresqlKey = arq.get("POSTGRESQL", "key")
PostgresqlUrl = arq.get("POSTGRESQL", "url")
PostgresqlDB = arq.get("POSTGRESQL", "database")
S3MinioUser = arq.get("S3MINIO", "user")
S3MinioKey = arq.get("S3MINIO", "key")
S3MinioUrl = arq.get("S3MINIO", "url")
S3MinioRegion = arq.get("S3MINIO", "region")
MongoUser = arq.get("MONGODB", "user")
MongoKey = arq.get("MONGODB", "key")
MongoUrl = arq.get("MONGODB", "url")
In [ ]:
%%time
# Load Dataset
df = pd.read_csv("out.csv", index_col=0)
In [ ]:
# df.head()
In [ ]:
df["from"] = pd.to_datetime(df["from"], unit="s")
df["to"] = pd.to_datetime(df["to"], unit="s")
# Optional use when not transoformed yet
# Transform Datetime
Funçoes¶
-> Class
In [ ]:
def timestamp2dataHora(x, timezone_="America/Sao_Paulo"):
d = datetime.fromtimestamp(x, tz=timezone(timezone_))
return d
ClickHouse¶
In [ ]:
# !! O client oficial usa um driver http, nesse exemplo vamos usar a biblioteca
# de terceirtos clickhouse_driver recomendada, por sua vez que usa tcp.
client = Client(
host=ClickHouseUrl,
user=ClickHouseUser,
password=ClickHouseKey,
settings={"use_numpy": True},
)
In [ ]:
# Create Tables in ClickHouse
# !! ALTERAR TIPOS !!
# ENGINE: 'Memory' desaparece quando server é reiniciado
client.execute(
"CREATE TABLE IF NOT EXISTS {} (id UInt32,"
"from DateTime, at UInt64, to DateTime, open Float64,"
"close Float64, min Float64, max Float64, volume UInt32)"
"ENGINE MergeTree ORDER BY to".format(dbname)
)
In [ ]:
%%time
# Write dataframe to db
client.insert_dataframe("INSERT INTO {} VALUES".format(dbname), df)
In [ ]:
%%time
client.query_dataframe("SELECT * FROM default.{}".format(dbname)) # LIMIT 10000
In [ ]:
%%time
df = pd.DataFrame(client.query_dataframe("SELECT * FROM default.{}".format(dbname)))
InfluxDB¶
In [ ]:
client = influxdb_client.InfluxDBClient(
url=InfluxDBUrl, token=InfluxDBKey, org=InfluxDBUser
)
In [ ]:
# Read data from CSV without index and parse 'TimeStamp' as date.
df = pd.read_csv("out.csv", sep=",", index_col=False, parse_dates=["from"])
# Set 'TimeStamp' field as index of dataframe # test another indexs
df.set_index("from", inplace=True)
In [ ]:
df.head()
In [ ]:
%%time
# gravando... demorou... mas deu certo
with client.write_api() as writer:
writer.write(
bucket=InfluxDBBucket,
record=df,
data_frame_measurement_name="id",
data_frame_tag_columns=["volume"],
)
In [ ]:
# data
# |> pivot(
# rowKey:["_time"],
# columnKey: ["_field"],
# valueColumn: "_value"
# )
In [ ]:
# Read
Postgresql¶
In [ ]:
# Connect / Create Tables
engine = create_engine(
"postgresql+psycopg2://{}:{}@{}:5432/{}".format(
PostgresqlUser, PostgresqlKey, PostgresqlUrl, PostgresqlDB
)
)
In [ ]:
# Drop old table and create new empty table
df.head(0).to_sql("comparedbs", engine, if_exists="replace", index=False)
In [ ]:
%%time
# Write
conn = engine.raw_connection()
cur = conn.cursor()
output = io.StringIO()
df.to_csv(output, sep="\t", header=False, index=False)
output.seek(0)
contents = output.getvalue()
cur.copy_from(output, "comparedbs") # , null="") # null values become ''
conn.commit()
cur.close()
conn.close()
In [ ]:
# Read
S3 Parquet¶
In [ ]:
# fs = s3fs.S3FileSystem(
# anon=False,
# use_ssl=False,
# client_kwargs={
# "region_name": S3MinioRegion,
# "endpoint_url": S3MinioUrl,
# "aws_access_key_id": "MatMPA7NyHltz7DQ",
# "aws_secret_access_key": "SO1IG5iBPSjNPZanYUaHCLcoSbjphLCP",
# "verify": False,
# },
# )
In [72]:
# fazer sem funçao para ver se melhora
# verifique se esta no ssd os arquivos da pasta git
def main():
# Create a client with the MinIO server playground, its access key
# and secret key.
client = Minio(
S3MinioUrl,
secure=False,
region=S3MinioRegion,
access_key="MatMPA7NyHltz7DQ",
secret_key="SO1IG5iBPSjNPZanYUaHCLcoSbjphLCP",
)
# Make 'asiatrip' bucket if not exist.
found = client.bucket_exists("data")
if not found:
client.make_bucket("data")
else:
print("Bucket 'data' already exists")
# Upload '/home/user/Photos/asiaphotos.zip' as object name
# 'asiaphotos-2015.zip' to bucket 'asiatrip'.
client.fput_object(
"data",
"data.parquet",
"data/data.parquet",
)
# print(
# "'data/data.parquet' is successfully uploaded as "
# "object 'data.parquet' to bucket 'data'."
# )
In [73]:
%%time
df.to_parquet("data/data.parquet")
if __name__ == "__main__":
try:
main()
except S3Error as exc:
print("error occurred.", exc)
In [71]:
pq = pd.read_parquet("data/data.parquet", engine="pyarrow")
pq.head()
Out[71]:
In [ ]:
# # from friendly.jupyter import Friendly
# path_to_s3_object = "http://192.168.1.125:9000/obsidian/sample.parquet"
# # df = to_df(data)
# df.to_parquet("data/data.parquet")
In [ ]:
# with fs.open("obsidian/data.parquet", "wb") as f:
# df.to_parquet(f)
In [ ]:
# s3_filepath = "obsidian/data.parquet"
# pq.write_to_dataset(
# Table.from_pandas(df),
# s3_filepath,
# filesystem=fs,
# use_dictionary=True,
# compression="snappy",
# version="2.4",
# )
In [ ]:
# pq.write_to_dataset(
# Table.from_pandas(df),
# path_to_s3_object,
# filesystem=fs,
# use_dictionary=True,
# compression="snappy",
# version="2.0",
# )
In [ ]:
# path_to_s3_object = "s3://sample-bucket/path/to/sample.parquet"
# data = [
# {
# "hoge": 1,
# "foo": "blah",
# },
# {
# "boo": "test",
# "bar": 123,
# },
# ]
# df = to_df(data)
# pq.write_to_dataset(
# Table.from_pandas(df),
# path_to_s3_object,
# filesystem=fs,
# use_dictionary=True,
# compression="snappy",
# version="2.0",
# )
MongoDB¶
In [ ]:
client = MongoClient(MongoUrl);
In [ ]:
DB = client["collection_name"]
In [ ]:
db = client["test"]
DuckDB¶
In [ ]:
Kdb+¶
In [ ]:
In [ ]: