Database/BigQuery

BigQuery에 Python으로 데이터(csv) 적재하기

dtstory 2022. 10. 19. 20:00

<빅 쿼리 데이터 적재>


# 모듈 로드

import glob
from google.cloud import bigquery
from google.oauth2 import service_account
import db_dtypes


# 서비스 계정 key(json) 경로 지정 
/my_path/serviceAccountKey 경로 내에 json key 파일을 위치시켜 둔다.
경로는 사용자가 원하는 곳에 두어도 무관하다.

key_path = glob.glob("/my_path/serviceAccountKey/*.json")[0]


# Credentials 객체 생성

credentials = service_account.Credentials.from_service_account_file(key_path)


# GCP 클라이언트 객체 생성

client = bigquery.Client(credentials = credentials,
                         project = credentials.project_id)


# table_id ="[프로젝트명].[데이터셋명].[데이터]" 순서로 빅쿼리 내 저장 시킬 위치 지정

table_id ="my_prj.my_dataset.data2"


# 저장시킬 데이터(csv) 저장경로

file_path = './my_folder/my_data.csv'


# job config 설정 및 적재 실행

job_config = bigquery.LoadJobConfig(
    source_format=bigquery.SourceFormat.CSV, autodetect=True,
    write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE
)
with open(file_path, "rb") as source_file:
    job=client.load_table_from_file(source_file, table_id, job_config=job_config)
    
job.result()


# save된 데이터 rows 확인

destination_table = client.get_table(table_id)  # Make an API request.
print("Loaded {} rows.".format(destination_table.num_rows))

rows가 일치하고


<전체 코드>

 

# Bigquery에 데이터(csv) 적재
import glob
from google.cloud import bigquery
from google.oauth2 import service_account
import db_dtypes

key_path = glob.glob("/my_path/serviceAccountKey/*.json")[0]

credentials = service_account.Credentials.from_service_account_file(key_path)

client = bigquery.Client(credentials = credentials,
                         project = credentials.project_id)
                         
table_id ="my_prj.my_dataset.data2"

file_path = './my_folder/my_data.csv'

job_config = bigquery.LoadJobConfig(
    source_format=bigquery.SourceFormat.CSV, autodetect=True,
    write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE
)
with open(file_path, "rb") as source_file:
    job=client.load_table_from_file(source_file, table_id, job_config=job_config)
    
job.result()

destination_table = client.get_table(table_id)  # Make an API request.
print("Loaded {} rows.".format(destination_table.num_rows))

 

 

728x90