Python コードサンプル

YouTube Reporting APIYouTube Analytics API には、Python の Google API クライアント ライブラリを使用する次のコードサンプルが利用可能です。これらのコードサンプルは、GitHub の YouTube API コードサンプル リポジトリpython フォルダからダウンロードできます。

一括レポート

レポートを取得する

このコードサンプルは、特定のジョブによって作成されたレポートを取得する方法を示しています。jobs.list メソッドを呼び出して、レポートジョブを取得します。次に、特定のジョブ ID を jobId パラメータに設定して reports.list メソッドを呼び出し、そのジョブによって作成されたレポートを取得します。最後に、各レポートのダウンロード URL が出力されます。

#!/usr/bin/python

###
#
# This script retrieves YouTube Reporting API reports. Use cases:
# 1. If you specify a report URL, the script downloads that report.
# 2. Otherwise, if you specify a job ID, the script retrieves a list of
#    available reports for that job and prompts you to select a report.
#    Then it retrieves that report as in case 1.
# 3. Otherwise, the list retrieves a list of jobs for the user or,
#    if specified, the content owner that the user is acting on behalf of.
#    Then it prompts the user to select a job, and then executes case 2 and
#    then case 1.
# Usage examples:
# python retrieve_reports.py --content_owner_id=<CONTENT_OWNER_ID> --local_file=<LOCAL_FILE>
# python retrieve_reports.py --content_owner_id=<CONTENT_OWNER_ID> --job_id=<JOB_ID> --local_file=<LOCAL_FILE>
# python retrieve_reports.py --content_owner_id=<CONTENT_OWNER_ID> --report_url=<REPORT_URL> --local_file=<LOCAL_FILE>
#
###

import argparse
import os

import google.oauth2.credentials
import google_auth_oauthlib.flow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from googleapiclient.http import MediaIoBaseDownload
from google_auth_oauthlib.flow import InstalledAppFlow
from io import FileIO


# The CLIENT_SECRETS_FILE variable specifies the name of a file that contains
# the OAuth 2.0 information for this application, including its client_id and
# client_secret. You can acquire an OAuth 2.0 client ID and client secret from
# the {{ Google Cloud Console }} at
# {{ https://cloud.google.com/console }}.
# Please ensure that you have enabled the YouTube Data API for your project.
# For more information about using OAuth2 to access the YouTube Data API, see:
#   https://developers.google.com/youtube/v3/guides/authentication
# For more information about the client_secrets.json file format, see:
#   https://developers.google.com/api-client-library/python/guide/aaa_client_secrets
CLIENT_SECRETS_FILE = 'client_secret.json'

# This OAuth 2.0 access scope allows for read access to YouTube Analytics
# monetary reports for the authenticated user's account. Any request that
# retrieves earnings or ad performance metrics must use this scope.
SCOPES = ['https://www.googleapis.com/auth/yt-analytics-monetary.readonly']
API_SERVICE_NAME = 'youtubereporting'
API_VERSION = 'v1'

# Authorize the request and store authorization credentials.
def get_authenticated_service():
  flow = InstalledAppFlow.from_client_secrets_file(CLIENT_SECRETS_FILE, SCOPES)
  credentials = flow.run_console()
  return build(API_SERVICE_NAME, API_VERSION, credentials = credentials)

# Remove keyword arguments that are not set.
def remove_empty_kwargs(**kwargs):
  good_kwargs = {}
  if kwargs is not None:
    for key, value in kwargs.iteritems():
      if value:
        good_kwargs[key] = value
  return good_kwargs

# Call the YouTube Reporting API's jobs.list method to retrieve reporting jobs.
def list_reporting_jobs(youtube_reporting, **kwargs):
  # Only include the onBehalfOfContentOwner keyword argument if the user
  # set a value for the --content_owner argument.
  kwargs = remove_empty_kwargs(**kwargs)

  # Retrieve the reporting jobs for the user (or content owner).
  results = youtube_reporting.jobs().list(**kwargs).execute()

  if 'jobs' in results and results['jobs']:
    jobs = results['jobs']
    for job in jobs:
      print ('Reporting job id: %s\n name: %s\n for reporting type: %s\n'
        % (job['id'], job['name'], job['reportTypeId']))
  else:
    print 'No jobs found'
    return False

  return True

# Call the YouTube Reporting API's reports.list method to retrieve reports created by a job.
def retrieve_reports(youtube_reporting, **kwargs):
  # Only include the onBehalfOfContentOwner keyword argument if the user
  # set a value for the --content_owner argument.
  kwargs = remove_empty_kwargs(**kwargs)

  # Retrieve available reports for the selected job.
  results = youtube_reporting.jobs().reports().list(
    **kwargs
  ).execute()

  if 'reports' in results and results['reports']:
    reports = results['reports']
    for report in reports:
      print ('Report dates: %s to %s\n       download URL: %s\n'
        % (report['startTime'], report['endTime'], report['downloadUrl']))


# Call the YouTube Reporting API's media.download method to download the report.
def download_report(youtube_reporting, report_url, local_file):
  request = youtube_reporting.media().download(
    resourceName=' '
  )
  request.uri = report_url
  fh = FileIO(local_file, mode='wb')
  # Stream/download the report in a single request.
  downloader = MediaIoBaseDownload(fh, request, chunksize=-1)

  done = False
  while done is False:
    status, done = downloader.next_chunk()
    if status:
      print 'Download %d%%.' % int(status.progress() * 100)
  print 'Download Complete!'


# Prompt the user to select a job and return the specified ID.
def get_job_id_from_user():
  job_id = raw_input('Please enter the job id for the report retrieval: ')
  print ('You chose "%s" as the job Id for the report retrieval.' % job_id)
  return job_id

# Prompt the user to select a report URL and return the specified URL.
def get_report_url_from_user():
  report_url = raw_input('Please enter the report URL to download: ')
  print ('You chose "%s" to download.' % report_url)
  return report_url

if __name__ == '__main__':
  parser = argparse.ArgumentParser()
  parser.add_argument('--content_owner', default='',
      help='ID of content owner for which you are retrieving jobs and reports')
  parser.add_argument('--job_id', default=None,
      help='ID of the job for which you are retrieving reports. If not ' +
           'provided AND report_url is also not provided, then the script ' +
           'calls jobs.list() to retrieve a list of jobs.')
  parser.add_argument('--report_url', default=None,
      help='URL of the report to retrieve. If not specified, the script ' +
           'calls reports.list() to retrieve a list of reports for the ' +
           'selected job.')
  parser.add_argument('--local_file', default='yt_report.txt',
      help='The name of the local file where the downloaded report will be written.')
  args = parser.parse_args()

  youtube_reporting = get_authenticated_service()
  try:
    # If the user has not specified a job ID or report URL, retrieve a list
    # of available jobs and prompt the user to select one.
    if not args.job_id and not args.report_url:
      if list_reporting_jobs(youtube_reporting,
                             onBehalfOfContentOwner=args.content_owner):
        args.job_id = get_job_id_from_user()

    # If the user has not specified a report URL, retrieve a list of reports
    # available for the specified job and prompt the user to select one.
    if args.job_id and not args.report_url:
      retrieve_reports(youtube_reporting,
                       jobId=args.job_id,
                       onBehalfOfContentOwner=args.content_owner)
      args.report_url = get_report_url_from_user()

    # Download the selected report.
    if args.report_url:
      download_report(youtube_reporting, args.report_url, args.local_file)
  except HttpError, e:
    print 'An HTTP error %d occurred:\n%s' % (e.resp.status, e.content)

レポートジョブを作成する

このコードサンプルは、レポートジョブの作成方法を示しています。reportTypes.list メソッドを呼び出して、使用可能なレポートタイプのリストを取得します。次に、jobs.create メソッドを呼び出して、新しいレポートジョブを作成します。

#!/usr/bin/python

# Create a reporting job for the authenticated user's channel or
# for a content owner that the user's account is linked to.
# Usage example:
# python create_reporting_job.py --name='<name>'
# python create_reporting_job.py --content-owner='<CONTENT OWNER ID>'
# python create_reporting_job.py --content-owner='<CONTENT_OWNER_ID>' --report-type='<REPORT_TYPE_ID>' --name='<REPORT_NAME>'

import argparse
import os

import google.oauth2.credentials
import google_auth_oauthlib.flow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from google_auth_oauthlib.flow import InstalledAppFlow


# The CLIENT_SECRETS_FILE variable specifies the name of a file that contains

# the OAuth 2.0 information for this application, including its client_id and
# client_secret. You can acquire an OAuth 2.0 client ID and client secret from
# the {{ Google Cloud Console }} at
# {{ https://cloud.google.com/console }}.
# Please ensure that you have enabled the YouTube Data API for your project.
# For more information about using OAuth2 to access the YouTube Data API, see:
#   https://developers.google.com/youtube/v3/guides/authentication
# For more information about the client_secrets.json file format, see:
#   https://developers.google.com/api-client-library/python/guide/aaa_client_secrets
CLIENT_SECRETS_FILE = 'client_secret.json'

# This OAuth 2.0 access scope allows for read access to the YouTube Analytics monetary reports for
# authenticated user's account. Any request that retrieves earnings or ad performance metrics must
# use this scope.
SCOPES = ['https://www.googleapis.com/auth/yt-analytics-monetary.readonly']
API_SERVICE_NAME = 'youtubereporting'
API_VERSION = 'v1'

# Authorize the request and store authorization credentials.
def get_authenticated_service():
  flow = InstalledAppFlow.from_client_secrets_file(CLIENT_SECRETS_FILE, SCOPES)
  credentials = flow.run_console()
  return build(API_SERVICE_NAME, API_VERSION, credentials = credentials)

# Remove keyword arguments that are not set.
def remove_empty_kwargs(**kwargs):
  good_kwargs = {}
  if kwargs is not None:
    for key, value in kwargs.iteritems():
      if value:
        good_kwargs[key] = value
  return good_kwargs

# Call the YouTube Reporting API's reportTypes.list method to retrieve report types.
def list_report_types(youtube_reporting, **kwargs):
  # Provide keyword arguments that have values as request parameters.
  kwargs = remove_empty_kwargs(**kwargs)
  results = youtube_reporting.reportTypes().list(**kwargs).execute()
  reportTypes = results['reportTypes']

  if 'reportTypes' in results and results['reportTypes']:
    reportTypes = results['reportTypes']
    for reportType in reportTypes:
      print 'Report type id: %s\n name: %s\n' % (reportType['id'], reportType['name'])
  else:
    print 'No report types found'
    return False

  return True


# Call the YouTube Reporting API's jobs.create method to create a job.
def create_reporting_job(youtube_reporting, report_type_id, **kwargs):
  # Provide keyword arguments that have values as request parameters.
  kwargs = remove_empty_kwargs(**kwargs)

  reporting_job = youtube_reporting.jobs().create(
    body=dict(
      reportTypeId=args.report_type,
      name=args.name
    ),
    **kwargs
  ).execute()

  print ('Reporting job "%s" created for reporting type "%s" at "%s"'
         % (reporting_job['name'], reporting_job['reportTypeId'],
             reporting_job['createTime']))


# Prompt the user to enter a report type id for the job. Then return the id.
def get_report_type_id_from_user():
  report_type_id = raw_input('Please enter the reportTypeId for the job: ')
  print ('You chose "%s" as the report type Id for the job.' % report_type_id)
  return report_type_id

# Prompt the user to set a job name
def prompt_user_to_set_job_name():
  job_name = raw_input('Please set a name for the job: ')
  print ('Great! "%s" is a memorable name for this job.' % job_name)
  return job_name


if __name__ == '__main__':
  parser = argparse.ArgumentParser()
  # The 'name' option specifies the name that will be used for the reporting job.
  parser.add_argument('--content-owner', default='',
      help='ID of content owner for which you are retrieving jobs and reports.')
  parser.add_argument('--include-system-managed', default=False,
      help='Whether the API response should include system-managed reports')
  parser.add_argument('--name', default='',
    help='Name for the reporting job. The script prompts you to set a name ' +
         'for the job if you do not provide one using this argument.')
  parser.add_argument('--report-type', default=None,
    help='The type of report for which you are creating a job.')
  args = parser.parse_args()

  youtube_reporting = get_authenticated_service()

  try:
    # Prompt user to select report type if they didn't set one on command line.
    if not args.report_type:
      if list_report_types(youtube_reporting,
                           onBehalfOfContentOwner=args.content_owner,
                           includeSystemManaged=args.include_system_managed):
        args.report_type = get_report_type_id_from_user()
    # Prompt user to set job name if not set on command line.
    if not args.name:
      args.name = prompt_user_to_set_job_name()
    # Create the job.
    if args.report_type:
      create_reporting_job(youtube_reporting,
                           args,
                           onBehalfOfContentOwner=args.content_owner)
  except HttpError, e:
    print 'An HTTP error %d occurred:\n%s' % (e.resp.status, e.content)

ターゲット クエリ レポート

毎日のチャンネル統計情報を取得する

この例では、YouTube Analytics API を呼び出して、認証ユーザーのチャンネルの 1 日あたりの視聴回数とその他の指標(2017 年)を取得します。サンプルでは、Google API Python クライアント ライブラリを使用しています。

このコードは、https://www.googleapis.com/auth/yt-analytics.readonly スコープにアクセスするユーザーに権限をリクエストします。

SCOPES = ['https://www.googleapis.com/auth/yt-analytics.readonly']

場合によっては、アプリケーションで他のスコープへのアクセス権をリクエストする必要があります。たとえば、YouTube Analytics API と YouTube Data API を呼び出すアプリケーションでは、ユーザーが自分の YouTube アカウントへのアクセス権も付与する必要がある場合があります。承認の概要では、YouTube Analytics API を呼び出すアプリケーションで通常使用されるスコープについて説明しています。

認証情報を設定する

このサンプルを初めてローカルで実行する前に、プロジェクトの認証情報を設定する必要があります。

  1. Google API Console でプロジェクトを作成または選択します。
  2. プロジェクトで YouTube Analytics API を有効にします。
  3. [認証情報] ページの上部にある [OAuth 同意画面] タブを選択します。[メールアドレス] を選択し、まだ設定していない場合はプロダクト名を入力し、[保存] ボタンをクリックします。
  4. [認証情報] ページで [認証情報を作成] ボタンをクリックし、[OAuth クライアント ID] を選択します。
  5. アプリケーション タイプとして [Other] を選択し、「YouTube Analytics API Quickstart」という名前を入力して [作成] ボタンをクリックします。
  6. [OK] をクリックして、表示されるダイアログを閉じます。
  7. クライアント ID の右側にある [](JSON をダウンロード)ボタンをクリックします。
  8. ダウンロードしたファイルを作業ディレクトリに移動します。

必要なライブラリをインストールする

また、Python 用の Google API クライアント ライブラリとその他のライブラリもインストールする必要があります。

pip install --upgrade google-api-python-client
pip install --upgrade google-auth google-auth-oauthlib google-auth-httplib2

コードの実行

これで、サンプルを実際にテストする準備が整いました。

  1. 以下のコードサンプルを作業ディレクトリにコピーします。
  2. このサンプルでは、認証情報の設定後にダウンロードしたファイルの場所と一致するように CLIENT_SECRETS_FILE 変数の値を更新します。
  3. ターミナル ウィンドウでサンプルコードを実行します。
    python yt_analytics_v2.py
  4. 承認フローを実行します。認証フローがブラウザに自動的に読み込まれる場合と、認証 URL をブラウザ ウィンドウにコピーしなければならない場合があります。承認フローの最後に、必要に応じて、ブラウザに表示された認証コードをターミナル ウィンドウに貼り付けて、[return] をクリックします。
  5. API クエリが実行され、JSON レスポンスがターミナル ウィンドウに出力されます。

サンプルコード

# -*- coding: utf-8 -*-

import os
import google.oauth2.credentials
import google_auth_oauthlib.flow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from google_auth_oauthlib.flow import InstalledAppFlow

SCOPES = ['https://www.googleapis.com/auth/yt-analytics.readonly']

API_SERVICE_NAME = 'youtubeAnalytics'
API_VERSION = 'v2'
CLIENT_SECRETS_FILE = 'YOUR_CLIENT_SECRET_FILE.json'
def get_service():
  flow = InstalledAppFlow.from_client_secrets_file(CLIENT_SECRETS_FILE, SCOPES)
  credentials = flow.run_console()
  return build(API_SERVICE_NAME, API_VERSION, credentials = credentials)

def execute_api_request(client_library_function, **kwargs):
  response = client_library_function(
    **kwargs
  ).execute()

  print(response)

if __name__ == '__main__':
  # Disable OAuthlib's HTTPs verification when running locally.
  # *DO NOT* leave this option enabled when running in production.
  os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1'

  youtubeAnalytics = get_service()
  execute_api_request(
      youtubeAnalytics.reports().query,
      ids='channel==MINE',
      startDate='2017-01-01',
      endDate='2017-12-31',
      metrics='estimatedMinutesWatched,views,likes,subscribersGained'
      dimensions='day',
      sort='day'
  )